MixerWorkerThread: added job queue modes and improved class structure

JobQueues can now operate in JobQueue::Static and JobQueue::Dynamic mode.
In static mode it operates the way it always used to while in dynamic
mode a changing job queue is supported. This is particularly important
for FX mixer sends.

There were also heavy improvements regarding the overall structure
and functionality of MixerWorkerThread and MixerWorkerThread::JobQueue.
There's now a clean distinction between multi-threaded processing
and actual (thread-safe) job queue processing. MixerWorkerThread does
not need to be a friend class of Mixer anymore.
This commit is contained in:
Tobias Doerffel
2009-10-24 18:59:00 +02:00
parent 4f5d31f862
commit ff03ddb8e4
5 changed files with 146 additions and 113 deletions

View File

@@ -481,7 +481,7 @@ void FxMixer::masterMix( sampleFrame * _buf )
// and add all channels to job list that have no dependencies
// when the channel completes it will check its parent to see if it needs
// to be processed.
MixerWorkerThread::resetJobQueue();
MixerWorkerThread::resetJobQueue( MixerWorkerThread::JobQueue::Dynamic );
addChannelLeaf( 0, _buf );
while( m_fxChannels[0]->state() != ThreadableJob::Done )
{

View File

@@ -28,17 +28,92 @@
#include "mixer.h"
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
MixerWorkerThread::JobQueue MixerWorkerThread::globalJobQueue;
QWaitCondition * MixerWorkerThread::queueReadyWaitCond = NULL;
QList<MixerWorkerThread *> MixerWorkerThread::workerThreads;
MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) :
// implementation of internal JobQueue
void MixerWorkerThread::JobQueue::reset( OperationMode _opMode )
{
m_queueSize = 0;
m_itemsDone = 0;
m_opMode = _opMode;
}
void MixerWorkerThread::JobQueue::addJob( ThreadableJob * _job )
{
if( _job->requiresProcessing() )
{
// update job state
_job->queue();
// actually queue the job via atomic operations
m_items[m_queueSize.fetchAndAddOrdered(1)] = _job;
}
}
void MixerWorkerThread::JobQueue::run( sampleFrame * _buffer )
{
bool processedJob = true;
while( processedJob && (int) m_itemsDone < (int) m_queueSize )
{
processedJob = false;
for( int i = 0; i < m_queueSize; ++i )
{
ThreadableJob * job = m_items[i].fetchAndStoreOrdered( NULL );
if( job )
{
job->process( _buffer );
processedJob = true;
m_itemsDone.fetchAndAddOrdered( 1 );
}
}
// always exit loop if we're not in dynamic mode
processedJob = processedJob && ( m_opMode == Dynamic );
}
}
void MixerWorkerThread::JobQueue::wait()
{
while( (int) m_itemsDone < (int) m_queueSize )
{
#if defined(LMMS_HOST_X86) || defined(LMMS_HOST_X86_64)
asm( "pause" );
#endif
}
}
// implementation of worker threads
MixerWorkerThread::MixerWorkerThread( mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ),
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
m_quit( false )
{
// initialize global static data
if( queueReadyWaitCond == NULL )
{
queueReadyWaitCond = new QWaitCondition;
}
// keep track of all instantiated worker threads - this is used for
// processing the last worker thread "inline", see comments in
// MixerWorkerThread::startAndWaitForJobs() for details
workerThreads << this;
resetJobQueue();
}
@@ -48,6 +123,8 @@ MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) :
MixerWorkerThread::~MixerWorkerThread()
{
CPU::freeFrames( m_workingBuf );
workerThreads.removeAll( this );
}
@@ -56,78 +133,20 @@ MixerWorkerThread::~MixerWorkerThread()
void MixerWorkerThread::quit()
{
m_quit = true;
resetJobQueue();
}
void MixerWorkerThread::processJobQueue()
void MixerWorkerThread::startAndWaitForJobs()
{
bool processedJob = true;
while( processedJob && (int) s_jobQueue.itemsDone < (int) s_jobQueue.queueSize )
{
processedJob = false;
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
ThreadableJob * job =
s_jobQueue.items[i].fetchAndStoreOrdered( NULL );
if( job )
{
job->process( m_workingBuf );
processedJob = true;
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
}
void MixerWorkerThread::resetJobQueue()
{
s_jobQueue.queueSize = 0;
s_jobQueue.itemsDone = 0;
}
void MixerWorkerThread::addJob( ThreadableJob * _job )
{
if( _job->requiresProcessing() )
{
// update job state
_job->queue();
// actually queue the job via atomic operations
s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job;
}
}
void MixerWorkerThread::startJobs()
{
// TODO: this is dirty!
engine::getMixer()->m_queueReadyWaitCond.wakeAll();
}
void MixerWorkerThread::waitForJobs()
{
// TODO: this is dirty!
mixer * m = engine::getMixer();
m->m_workers[m->m_numWorkers]->processJobQueue();
while( (int) s_jobQueue.itemsDone < (int) s_jobQueue.queueSize )
{
#if defined(LMMS_HOST_X86) || defined(LMMS_HOST_X86_64)
asm( "pause" );
#endif
}
queueReadyWaitCond->wakeAll();
// The last worker-thread is never started. Instead it's processed "inline"
// i.e. within the global Mixer thread. This way we can reduce latencies
// that otherwise would be caused by synchronizing with another thread.
globalJobQueue.run( workerThreads.last()->m_workingBuf );
globalJobQueue.wait();
}
@@ -139,8 +158,8 @@ void MixerWorkerThread::run()
while( m_quit == false )
{
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
queueReadyWaitCond->wait( &m );
globalJobQueue.run( m_workingBuf );
m.unlock();
}
}

View File

@@ -72,7 +72,6 @@ mixer::mixer() :
m_cpuLoad( 0 ),
m_workers(),
m_numWorkers( QThread::idealThreadCount()-1 ),
m_queueReadyWaitCond(),
m_qualitySettings( qualitySettings::Mode_Draft ),
m_masterGain( 1.0f ),
m_audioDev( NULL ),
@@ -130,7 +129,7 @@ mixer::mixer() :
for( int i = 0; i < m_numWorkers+1; ++i )
{
MixerWorkerThread * wt = new MixerWorkerThread( i, this );
MixerWorkerThread * wt = new MixerWorkerThread( this );
if( i < m_numWorkers )
{
wt->start( QThread::TimeCriticalPriority );
@@ -148,14 +147,11 @@ mixer::mixer() :
mixer::~mixer()
{
// distribute an empty job-queue so that worker-threads
// get out of their processing-loop
MixerWorkerThread::s_jobQueue.queueSize = 0;
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w]->quit();
}
MixerWorkerThread::startJobs();
MixerWorkerThread::startAndWaitForJobs();
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w]->wait( 500 );