diff --git a/include/MixerWorkerThread.h b/include/MixerWorkerThread.h index 03e195e60..09a54ba92 100644 --- a/include/MixerWorkerThread.h +++ b/include/MixerWorkerThread.h @@ -27,7 +27,6 @@ #include #include -#include #include "mixer.h" @@ -35,61 +34,82 @@ class MixerWorkerThread : public QThread { public: - struct JobQueue + // internal representation of the job queue - all functions are thread-safe + class JobQueue { -#define JOB_QUEUE_SIZE 1024 + public: + enum OperationMode + { + Static, // no jobs added while processing queue + Dynamic // jobs can be added while processing queue + } ; + JobQueue() : - items(), - queueSize( 0 ), - itemsDone( 0 ) + m_items(), + m_queueSize( 0 ), + m_itemsDone( 0 ), + m_opMode( Static ) { } - QAtomicPointer items[JOB_QUEUE_SIZE]; - QAtomicInt queueSize; - QAtomicInt itemsDone; + void reset( OperationMode _opMode ); + + void addJob( ThreadableJob * _job ); + + void run( sampleFrame * _buffer ); + void wait(); + + private: +#define JOB_QUEUE_SIZE 1024 + QAtomicPointer m_items[JOB_QUEUE_SIZE]; + QAtomicInt m_queueSize; + QAtomicInt m_itemsDone; + OperationMode m_opMode; + } ; - static JobQueue s_jobQueue; - MixerWorkerThread( int _worker_num, mixer * _mixer ); + MixerWorkerThread( mixer * _mixer ); virtual ~MixerWorkerThread(); virtual void quit(); - void processJobQueue(); - static void resetJobQueue(); - - template - static void fillJobQueue( const T & _vec ) + static void resetJobQueue( JobQueue::OperationMode _opMode = + JobQueue::Static ) { - resetJobQueue(); + globalJobQueue.reset( _opMode ); + } + + static void addJob( ThreadableJob * _job ) + { + globalJobQueue.addJob( _job ); + } + + // a convenient helper function allowing to pass a container with pointers + // to ThreadableJob objects + template + static void fillJobQueue( const T & _vec, + JobQueue::OperationMode _opMode = JobQueue::Static ) + { + resetJobQueue( _opMode ); for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it ) { addJob( *it ); } } - static void addJob( ThreadableJob * _job ); - - static void startJobs(); - static void waitForJobs(); - - static void startAndWaitForJobs() - { - startJobs(); - waitForJobs(); - } + static void startAndWaitForJobs(); private: virtual void run(); + static JobQueue globalJobQueue; + static QWaitCondition * queueReadyWaitCond; + static QList workerThreads; + sampleFrame * m_workingBuf; - int m_workerNum; volatile bool m_quit; - mixer * m_mixer; - QWaitCondition * m_queueReadyWaitCond; } ; diff --git a/include/mixer.h b/include/mixer.h index 8c539a0c8..a92b16924 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -433,7 +433,6 @@ private: int m_cpuLoad; QVector m_workers; int m_numWorkers; - QWaitCondition m_queueReadyWaitCond; PlayHandleList m_playHandles; @@ -461,7 +460,6 @@ private: friend class engine; - friend class MixerWorkerThread; } ; diff --git a/src/core/FxMixer.cpp b/src/core/FxMixer.cpp index e936d33ba..13c7d626a 100644 --- a/src/core/FxMixer.cpp +++ b/src/core/FxMixer.cpp @@ -481,7 +481,7 @@ void FxMixer::masterMix( sampleFrame * _buf ) // and add all channels to job list that have no dependencies // when the channel completes it will check its parent to see if it needs // to be processed. - MixerWorkerThread::resetJobQueue(); + MixerWorkerThread::resetJobQueue( MixerWorkerThread::JobQueue::Dynamic ); addChannelLeaf( 0, _buf ); while( m_fxChannels[0]->state() != ThreadableJob::Done ) { diff --git a/src/core/MixerWorkerThread.cpp b/src/core/MixerWorkerThread.cpp index 8941b1a56..f9d2e3e95 100644 --- a/src/core/MixerWorkerThread.cpp +++ b/src/core/MixerWorkerThread.cpp @@ -28,17 +28,92 @@ #include "mixer.h" -MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue; +MixerWorkerThread::JobQueue MixerWorkerThread::globalJobQueue; +QWaitCondition * MixerWorkerThread::queueReadyWaitCond = NULL; +QList MixerWorkerThread::workerThreads; -MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) : + +// implementation of internal JobQueue +void MixerWorkerThread::JobQueue::reset( OperationMode _opMode ) +{ + m_queueSize = 0; + m_itemsDone = 0; + m_opMode = _opMode; +} + + + + +void MixerWorkerThread::JobQueue::addJob( ThreadableJob * _job ) +{ + if( _job->requiresProcessing() ) + { + // update job state + _job->queue(); + // actually queue the job via atomic operations + m_items[m_queueSize.fetchAndAddOrdered(1)] = _job; + } +} + + + +void MixerWorkerThread::JobQueue::run( sampleFrame * _buffer ) +{ + bool processedJob = true; + while( processedJob && (int) m_itemsDone < (int) m_queueSize ) + { + processedJob = false; + for( int i = 0; i < m_queueSize; ++i ) + { + ThreadableJob * job = m_items[i].fetchAndStoreOrdered( NULL ); + if( job ) + { + job->process( _buffer ); + processedJob = true; + m_itemsDone.fetchAndAddOrdered( 1 ); + } + } + // always exit loop if we're not in dynamic mode + processedJob = processedJob && ( m_opMode == Dynamic ); + } +} + + + + +void MixerWorkerThread::JobQueue::wait() +{ + while( (int) m_itemsDone < (int) m_queueSize ) + { +#if defined(LMMS_HOST_X86) || defined(LMMS_HOST_X86_64) + asm( "pause" ); +#endif + } +} + + + + + +// implementation of worker threads + +MixerWorkerThread::MixerWorkerThread( mixer * _mixer ) : QThread( _mixer ), m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ), - m_workerNum( _worker_num ), - m_quit( false ), - m_mixer( _mixer ), - m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) + m_quit( false ) { + // initialize global static data + if( queueReadyWaitCond == NULL ) + { + queueReadyWaitCond = new QWaitCondition; + } + + // keep track of all instantiated worker threads - this is used for + // processing the last worker thread "inline", see comments in + // MixerWorkerThread::startAndWaitForJobs() for details + workerThreads << this; + resetJobQueue(); } @@ -48,6 +123,8 @@ MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) : MixerWorkerThread::~MixerWorkerThread() { CPU::freeFrames( m_workingBuf ); + + workerThreads.removeAll( this ); } @@ -56,78 +133,20 @@ MixerWorkerThread::~MixerWorkerThread() void MixerWorkerThread::quit() { m_quit = true; + resetJobQueue(); } -void MixerWorkerThread::processJobQueue() +void MixerWorkerThread::startAndWaitForJobs() { - bool processedJob = true; - while( processedJob && (int) s_jobQueue.itemsDone < (int) s_jobQueue.queueSize ) - { - processedJob = false; - for( int i = 0; i < s_jobQueue.queueSize; ++i ) - { - ThreadableJob * job = - s_jobQueue.items[i].fetchAndStoreOrdered( NULL ); - if( job ) - { - job->process( m_workingBuf ); - processedJob = true; - s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); - } - } - } -} - - - - -void MixerWorkerThread::resetJobQueue() -{ - s_jobQueue.queueSize = 0; - s_jobQueue.itemsDone = 0; -} - - - - -void MixerWorkerThread::addJob( ThreadableJob * _job ) -{ - if( _job->requiresProcessing() ) - { - // update job state - _job->queue(); - // actually queue the job via atomic operations - s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job; - } -} - - - - -void MixerWorkerThread::startJobs() -{ - // TODO: this is dirty! - engine::getMixer()->m_queueReadyWaitCond.wakeAll(); -} - - - - -void MixerWorkerThread::waitForJobs() -{ - // TODO: this is dirty! - mixer * m = engine::getMixer(); - m->m_workers[m->m_numWorkers]->processJobQueue(); - - while( (int) s_jobQueue.itemsDone < (int) s_jobQueue.queueSize ) - { -#if defined(LMMS_HOST_X86) || defined(LMMS_HOST_X86_64) - asm( "pause" ); -#endif - } + queueReadyWaitCond->wakeAll(); + // The last worker-thread is never started. Instead it's processed "inline" + // i.e. within the global Mixer thread. This way we can reduce latencies + // that otherwise would be caused by synchronizing with another thread. + globalJobQueue.run( workerThreads.last()->m_workingBuf ); + globalJobQueue.wait(); } @@ -139,8 +158,8 @@ void MixerWorkerThread::run() while( m_quit == false ) { m.lock(); - m_queueReadyWaitCond->wait( &m ); - processJobQueue(); + queueReadyWaitCond->wait( &m ); + globalJobQueue.run( m_workingBuf ); m.unlock(); } } diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index dab8ee496..ac6663930 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -72,7 +72,6 @@ mixer::mixer() : m_cpuLoad( 0 ), m_workers(), m_numWorkers( QThread::idealThreadCount()-1 ), - m_queueReadyWaitCond(), m_qualitySettings( qualitySettings::Mode_Draft ), m_masterGain( 1.0f ), m_audioDev( NULL ), @@ -130,7 +129,7 @@ mixer::mixer() : for( int i = 0; i < m_numWorkers+1; ++i ) { - MixerWorkerThread * wt = new MixerWorkerThread( i, this ); + MixerWorkerThread * wt = new MixerWorkerThread( this ); if( i < m_numWorkers ) { wt->start( QThread::TimeCriticalPriority ); @@ -148,14 +147,11 @@ mixer::mixer() : mixer::~mixer() { - // distribute an empty job-queue so that worker-threads - // get out of their processing-loop - MixerWorkerThread::s_jobQueue.queueSize = 0; for( int w = 0; w < m_numWorkers; ++w ) { m_workers[w]->quit(); } - MixerWorkerThread::startJobs(); + MixerWorkerThread::startAndWaitForJobs(); for( int w = 0; w < m_numWorkers; ++w ) { m_workers[w]->wait( 500 );