From a9d24d34f26ffe63306650979bbd571d1052c7a0 Mon Sep 17 00:00:00 2001 From: Tobias Doerffel Date: Wed, 14 Oct 2009 01:22:31 +0200 Subject: [PATCH] Mixer/FxMixer: separated MixerWorkerThread and ThreadableJob into files Declarations and implementations of MixerWorkerThread and ThreadableJob have been moved into separate source files. Furthermore there were some improvements to MixerWorkerThreads. MixerWorkerThread::processJobQueue() does not return until the job queue completely has been processed. This way each thread can "help" to finish processing the queue and does not get back to sleep until all of the work is done. Management of the queue is now done via an array of QAtomicPointers. Items that are non-NULL still need to be processed while NULL-items were taken from the queue (i.e. in progress or done). Thus we do not need to deal with ThreadableJob-states within MixerWorkerThread anymore. --- include/AudioPort.h | 1 - include/FxMixer.h | 1 - include/MixerWorkerThread.h | 97 +++++++++++++++++ include/ThreadableJob.h | 84 +++++++++++++++ include/mixer.h | 190 +-------------------------------- include/play_handle.h | 1 + src/core/FxMixer.cpp | 23 ++-- src/core/MixerWorkerThread.cpp | 138 ++++++++++++++++++++++++ src/core/mixer.cpp | 8 +- 9 files changed, 329 insertions(+), 214 deletions(-) create mode 100644 include/MixerWorkerThread.h create mode 100644 include/ThreadableJob.h create mode 100644 src/core/MixerWorkerThread.cpp diff --git a/include/AudioPort.h b/include/AudioPort.h index d84dd179f..6ac7e8294 100644 --- a/include/AudioPort.h +++ b/include/AudioPort.h @@ -27,7 +27,6 @@ #include #include -#include #include "mixer.h" diff --git a/include/FxMixer.h b/include/FxMixer.h index 3d76903d5..21fe100f3 100644 --- a/include/FxMixer.h +++ b/include/FxMixer.h @@ -76,7 +76,6 @@ public: virtual ~FxMixer(); void mixToChannel( const sampleFrame * _buf, fx_ch_t _ch ); - void processChannel( fx_ch_t _ch, sampleFrame * _buf = NULL ); void prepareMasterMix(); void masterMix( sampleFrame * _buf ); diff --git a/include/MixerWorkerThread.h b/include/MixerWorkerThread.h new file mode 100644 index 000000000..03e195e60 --- /dev/null +++ b/include/MixerWorkerThread.h @@ -0,0 +1,97 @@ +/* + * MixerWorkerThread.h - declaration of class MixerWorkerThread + * + * Copyright (c) 2009 Tobias Doerffel + * + * This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program (see COPYING); if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301 USA. + * + */ + +#ifndef _MIXER_WORKER_THREAD_H +#define _MIXER_WORKER_THREAD_H + +#include +#include +#include + +#include "mixer.h" + + +class MixerWorkerThread : public QThread +{ +public: + struct JobQueue + { +#define JOB_QUEUE_SIZE 1024 + JobQueue() : + items(), + queueSize( 0 ), + itemsDone( 0 ) + { + } + + QAtomicPointer items[JOB_QUEUE_SIZE]; + QAtomicInt queueSize; + QAtomicInt itemsDone; + } ; + + static JobQueue s_jobQueue; + + MixerWorkerThread( int _worker_num, mixer * _mixer ); + virtual ~MixerWorkerThread(); + + virtual void quit(); + + void processJobQueue(); + static void resetJobQueue(); + + template + static void fillJobQueue( const T & _vec ) + { + resetJobQueue(); + for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it ) + { + addJob( *it ); + } + } + + static void addJob( ThreadableJob * _job ); + + static void startJobs(); + static void waitForJobs(); + + static void startAndWaitForJobs() + { + startJobs(); + waitForJobs(); + } + + +private: + virtual void run(); + + sampleFrame * m_workingBuf; + int m_workerNum; + volatile bool m_quit; + mixer * m_mixer; + QWaitCondition * m_queueReadyWaitCond; + +} ; + + +#endif diff --git a/include/ThreadableJob.h b/include/ThreadableJob.h new file mode 100644 index 000000000..bade2baa5 --- /dev/null +++ b/include/ThreadableJob.h @@ -0,0 +1,84 @@ +/* + * ThreadableJob.h - declaration of class ThreadableJob + * + * Copyright (c) 2009 Tobias Doerffel + * + * This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program (see COPYING); if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301 USA. + * + */ + +#ifndef _THREADABLE_JOB_H +#define _THREADABLE_JOB_H + +#include + +#include "mixer.h" + + +class ThreadableJob +{ +public: + + enum ProcessingState + { + Unstarted, + Queued, + InProgress, + Done + }; + + ThreadableJob() : + m_state( ThreadableJob::Unstarted ) + { + } + + inline ProcessingState state() const + { + return static_cast( (int) m_state ); + } + + inline void reset() + { + m_state = Unstarted; + } + + inline void queue() + { + m_state = Queued; + } + + void process( sampleFrame * _working_buffer ) + { + if( m_state.testAndSetOrdered( Queued, InProgress ) ) + { + doProcessing( _working_buffer ); + m_state = Done; + } + } + + virtual bool requiresProcessing() const = 0; + + +protected: + virtual void doProcessing( sampleFrame * _working_buffer ) = 0; + + QAtomicInt m_state; + +} ; + +#endif diff --git a/include/mixer.h b/include/mixer.h index d6db2cb12..8c539a0c8 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -62,53 +62,9 @@ const Keys BaseKey = Key_A; const Octaves BaseOctave = DefaultOctave; - class MixerWorkerThread; -// TODO: move to ThreadableJob.h -class ThreadableJob -{ -public: - - enum ProcessingState - { - Unstarted, - Queued, - InProgress, - Done - }; - - ThreadableJob() : - m_state( ThreadableJob::Unstarted ) - { - } - - void reset() - { - m_state = ThreadableJob::Unstarted; - } - - bool process( sampleFrame * _working_buffer ) - { - if( m_state.testAndSetOrdered( Queued, InProgress ) ) - { - doProcessing( _working_buffer ); - m_state = Done; - return true; - } - return false; - } - - virtual bool requiresProcessing() const = 0; - - QAtomicInt m_state; - -private: - virtual void doProcessing( sampleFrame * _working_buffer ) = 0; - -} ; - - +#include "ThreadableJob.h" #include "play_handle.h" @@ -509,148 +465,4 @@ private: } ; - -// TODO: move to MixerWorkerThread.h / MixerWorkerThread.cpp -#include "Cpu.h" -#include "engine.h" - -class MixerWorkerThread : public QThread -{ -public: - struct JobQueue - { -#define JOB_QUEUE_SIZE 1024 - JobQueue() : - queueSize( 0 ), - itemsDone( 0 ) - { - for( int i = 0; i < JOB_QUEUE_SIZE; ++i ) - { - items[i] = NULL; - } - } - - ThreadableJob * items[JOB_QUEUE_SIZE]; - QAtomicInt queueSize; - QAtomicInt itemsDone; - } ; - - static JobQueue s_jobQueue; - - MixerWorkerThread( int _worker_num, mixer * _mixer ) : - QThread( _mixer ), - m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ), - m_workerNum( _worker_num ), - m_quit( false ), - m_mixer( _mixer ), - m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) - { - } - - virtual ~MixerWorkerThread() - { - CPU::freeFrames( m_workingBuf ); - } - - virtual void quit() - { - m_quit = true; - } - - void processJobQueue() - { - for( int i = 0; i < s_jobQueue.queueSize; ++i ) - { - // returns true if ThreadableJob was not processed before - if( s_jobQueue.items[i]->process( m_workingBuf ) ) - { - s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); - } - } - } - - static void resetJobQueue() - { - s_jobQueue.queueSize = 0; - s_jobQueue.itemsDone = 0; - } - - template - static void fillJobQueue( const T & _vec ) - { - resetJobQueue(); - for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it ) - { - addJob( *it ); - } - } - - static void addJob( ThreadableJob * _job ) - { - if( _job->requiresProcessing() ) - { - _job->m_state = ThreadableJob::Queued; - s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job; - } - } - - -// define a pause instruction for spinlock-loop - merely useful on -// HyperThreading systems with just one physical core (e.g. Intel Atom) -#ifdef LMMS_HOST_X86 -#define SPINLOCK_PAUSE() asm( "pause" ) -#else -#ifdef LMMS_HOST_X86_64 -#define SPINLOCK_PAUSE() asm( "pause" ) -#else -#define SPINLOCK_PAUSE() -#endif -#endif - - static void startJobs() - { - // TODO: this is dirty! - engine::getMixer()->m_queueReadyWaitCond.wakeAll(); - } - - static void waitForJobs() - { - // TODO: this is dirty! - mixer * m = engine::getMixer(); - m->m_workers[m->m_numWorkers]->processJobQueue(); - while( s_jobQueue.itemsDone < s_jobQueue.queueSize ) - { - startJobs(); - } - } - - static void startAndWaitForJobs() - { - startJobs(); - waitForJobs(); - } - - -private: - virtual void run() - { - QMutex m; - while( m_quit == false ) - { - m.lock(); - m_queueReadyWaitCond->wait( &m ); - processJobQueue(); - m.unlock(); - } - } - - sampleFrame * m_workingBuf; - int m_workerNum; - volatile bool m_quit; - mixer * m_mixer; - QWaitCondition * m_queueReadyWaitCond; - -} ; - - #endif diff --git a/include/play_handle.h b/include/play_handle.h index 1a49fb540..26a8f1058 100644 --- a/include/play_handle.h +++ b/include/play_handle.h @@ -25,6 +25,7 @@ #ifndef _PLAY_HANDLE_H #define _PLAY_HANDLE_H +#include "ThreadableJob.h" #include "mixer.h" class track; diff --git a/src/core/FxMixer.cpp b/src/core/FxMixer.cpp index 1e8a255e6..e936d33ba 100644 --- a/src/core/FxMixer.cpp +++ b/src/core/FxMixer.cpp @@ -22,10 +22,10 @@ * */ - #include #include "FxMixer.h" +#include "MixerWorkerThread.h" #include "Cpu.h" #include "Effect.h" #include "song.h" @@ -60,7 +60,7 @@ FxChannel::~FxChannel() -void FxChannel::doProcessing(sampleFrame * _buf) +void FxChannel::doProcessing( sampleFrame * _buf ) { FxMixer * fxm = engine::fxMixer(); const fpp_t fpp = engine::getMixer()->framesPerPeriod(); @@ -97,7 +97,7 @@ void FxChannel::doProcessing(sampleFrame * _buf) const float v = m_volumeModel.value(); m_fxChain.startRunning(); - m_stillRunning = m_fxChain.processAudioBuffer( _buf, fpp); + m_stillRunning = m_fxChain.processAudioBuffer( _buf, fpp ); m_peakLeft = engine::getMixer()->peakValueLeft( _buf, fpp ) * v; m_peakRight = engine::getMixer()->peakValueRight( _buf, fpp ) * v; } @@ -113,13 +113,13 @@ void FxChannel::doProcessing(sampleFrame * _buf) { // if parent.unstarted and every parent.leaf.done: FxChannel * parent = fxm->effectChannel(m_sends[i]); - if( parent->m_state == ThreadableJob::Unstarted ) + if( parent->state() == ThreadableJob::Unstarted ) { bool everyLeafDone = true; for( int j=0; jm_receives.size(); ++j ) { - if( fxm->effectChannel(parent->m_receives[j])->m_state != - ThreadableJob::Done ) + if( fxm->effectChannel( parent->m_receives[j] )->state() != + ThreadableJob::Done ) { everyLeafDone = false; break; @@ -435,15 +435,6 @@ void FxMixer::mixToChannel( const sampleFrame * _buf, fx_ch_t _ch ) -void FxMixer::processChannel( fx_ch_t _ch, sampleFrame * _buf ) -{ - m_fxChannels[_ch]->process(_buf); - -} - - - - void FxMixer::prepareMasterMix() { engine::getMixer()->clearAudioBuffer( m_fxChannels[0]->m_buffer, @@ -492,7 +483,7 @@ void FxMixer::masterMix( sampleFrame * _buf ) // to be processed. MixerWorkerThread::resetJobQueue(); addChannelLeaf( 0, _buf ); - while( m_fxChannels[0]->m_state != ThreadableJob::Done ) + while( m_fxChannels[0]->state() != ThreadableJob::Done ) { MixerWorkerThread::startAndWaitForJobs(); } diff --git a/src/core/MixerWorkerThread.cpp b/src/core/MixerWorkerThread.cpp new file mode 100644 index 000000000..539b32245 --- /dev/null +++ b/src/core/MixerWorkerThread.cpp @@ -0,0 +1,138 @@ +/* + * MixerWorkerThread.cpp - implementation of MixerWorkerThread + * + * Copyright (c) 2009 Tobias Doerffel + * + * This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program (see COPYING); if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301 USA. + * + */ + +#include "MixerWorkerThread.h" +#include "Cpu.h" +#include "engine.h" +#include "mixer.h" + + +MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue; + + +MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) : + QThread( _mixer ), + m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ), + m_workerNum( _worker_num ), + m_quit( false ), + m_mixer( _mixer ), + m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) +{ + resetJobQueue(); +} + + + + +MixerWorkerThread::~MixerWorkerThread() +{ + CPU::freeFrames( m_workingBuf ); +} + + + + +void MixerWorkerThread::quit() +{ + m_quit = true; +} + + + + +void MixerWorkerThread::processJobQueue() +{ + while( s_jobQueue.itemsDone != s_jobQueue.queueSize ) + { + for( int i = 0; i < s_jobQueue.queueSize; ++i ) + { + ThreadableJob * job = + s_jobQueue.items[i].fetchAndStoreOrdered( NULL ); + if( job ) + { + job->process( m_workingBuf ); + s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); + } + } + } +} + + + + +void MixerWorkerThread::resetJobQueue() +{ + s_jobQueue.queueSize = 0; + s_jobQueue.itemsDone = 0; +} + + + + +void MixerWorkerThread::addJob( ThreadableJob * _job ) +{ + if( _job->requiresProcessing() ) + { + // update job state + _job->queue(); + // actually queue the job via atomic operations + s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job; + } +} + + + + +void MixerWorkerThread::startJobs() +{ + // TODO: this is dirty! + engine::getMixer()->m_queueReadyWaitCond.wakeAll(); +} + + + + +void MixerWorkerThread::waitForJobs() +{ + // TODO: this is dirty! + mixer * m = engine::getMixer(); + m->m_workers[m->m_numWorkers]->processJobQueue(); +} + + + + +void MixerWorkerThread::run() +{ + QMutex m; + while( m_quit == false ) + { + m.lock(); + m_queueReadyWaitCond->wait( &m ); + processJobQueue(); + m.unlock(); + } +} + + diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index f8e0c38b0..dab8ee496 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -26,6 +26,7 @@ #include "mixer.h" #include "FxMixer.h" +#include "MixerWorkerThread.h" #include "play_handle.h" #include "song.h" #include "templates.h" @@ -61,13 +62,6 @@ #endif - -MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue; - - - - - mixer::mixer() : m_framesPerPeriod( DEFAULT_BUFFER_SIZE ), m_workingBuf( NULL ),