From 0acbf718af86c1f5966fbbc4964bd2fc06bf161d Mon Sep 17 00:00:00 2001 From: Tobias Doerffel Date: Wed, 27 Feb 2008 14:57:07 +0000 Subject: [PATCH] distribute whole job-queue instead of single jobs and re-assign unprocessed jobs to idle worker-threads - improves multi-threading behaviour a lot git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/branches/lmms-mv@723 0778d3d1-df1d-0410-868b-ea421aaaa00d --- include/mixer.h | 1 + src/core/mixer.cpp | 225 +++++++++++++++++++++++++++++++-------------- 2 files changed, 159 insertions(+), 67 deletions(-) diff --git a/include/mixer.h b/include/mixer.h index 9dd4a277e..b7ccda455 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -366,6 +366,7 @@ private: Uint8 m_cpuLoad; bool m_multiThreaded; QVector m_workers; + int m_numWorkers; QSemaphore m_workerSem; diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index b8a6a7390..b9ad6229d 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -72,18 +72,49 @@ public: InvalidJob, PlayHandle, AudioPortEffects, + EffectChannel, NumJobTypes } ; - mixerWorkerThread( mixer * _mixer, QSemaphore * _sem ) : + struct jobQueueItem + { + jobQueueItem() : + workerID( -1 ), + type( InvalidJob ), + job( NULL ), + done( FALSE ) + { + } + jobQueueItem( int _id, JobTypes _type, void * _job ) : + workerID( _id ), + type( _type ), + job( _job ), + done( FALSE ) + { + } + int workerID; + JobTypes type; + void * job; + volatile bool done; + } ; + + typedef QVector jobQueueItems; + struct jobQueue + { + jobQueueItems items; + int remaining; + QReadWriteLock lock; + } ; + + mixerWorkerThread( mixer * _mixer, int _id ) : QThread( _mixer ), m_mixer( _mixer ), - m_sem( _sem ), + m_id( _id ), + m_sem( &m_mixer->m_workerSem ), m_jobWait( 1 ), m_jobAccepted( 1 ), - m_idle( FALSE ), - m_job( NULL ), - m_jobType( InvalidJob ) + m_jobQueue( NULL ), + m_idle( FALSE ) { start( QThread::TimeCriticalPriority ); } @@ -92,10 +123,9 @@ public: { } - void addJob( JobTypes _t, void * _job ) + void addJob( jobQueue * _q ) { - m_jobType = _t; - m_job = _job; + m_jobQueue = _q; m_jobWait.release(); m_jobAccepted.acquire(); } @@ -118,20 +148,40 @@ private: m_idle = FALSE; m_sem->acquire(); m_jobAccepted.release(); - if( m_jobType == PlayHandle ) + for( jobQueueItems::iterator it = m_jobQueue->items.begin(); + it != m_jobQueue->items.end(); ++it ) { - ( (playHandle *) m_job )->play(); - } - else if( m_jobType == AudioPortEffects ) - { - audioPort * a = (audioPort *) m_job; - bool me = a->processEffects(); - if( a->m_bufferUsage != audioPort::NONE || me ) + m_jobQueue->lock.lockForRead(); + const int id = it->workerID; + const bool done = it->done; + m_jobQueue->lock.unlock(); + if( !done && id == m_id ) { - m_mixer->processBuffer( - a->firstBuffer(), - a->nextFxChannel() ); - a->nextPeriod(); + m_jobQueue->lock.lockForWrite(); + it->done = TRUE; + --m_jobQueue->remaining; + m_jobQueue->lock.unlock(); + switch( it->type ) + { + case PlayHandle: + ( (playHandle *) it->job )->play(); + break; + case AudioPortEffects: + { + audioPort * a = (audioPort *) it->job; + bool me = a->processEffects(); + if( a->m_bufferUsage != audioPort::NONE || me ) + { + m_mixer->processBuffer( + a->firstBuffer(), + a->nextFxChannel() ); + a->nextPeriod(); + } + } + break; + default: + break; + } } } m_idle = TRUE; @@ -140,11 +190,11 @@ private: } mixer * m_mixer; + int m_id; QSemaphore * m_sem; QSemaphore m_jobWait; QSemaphore m_jobAccepted; - void * m_job; - JobTypes m_jobType; + jobQueue * m_jobQueue; volatile bool m_idle; } ; @@ -158,7 +208,8 @@ mixer::mixer( void ) : m_cpuLoad( 0 ), m_multiThreaded( QThread::idealThreadCount() > 1 ), m_workers(), - m_workerSem( m_multiThreaded ? QThread::idealThreadCount() : 0 ), + m_numWorkers( m_multiThreaded ? QThread::idealThreadCount() : 0 ), + m_workerSem( m_numWorkers ), m_qualityLevel( DEFAULT_QUALITY_LEVEL ), m_masterGain( 1.0f ), m_audioDev( NULL ), @@ -200,10 +251,9 @@ mixer::mixer( void ) : if( m_multiThreaded ) { - for( int i = 0; i < QThread::idealThreadCount(); ++i ) + for( int i = 0; i < m_numWorkers; ++i ) { - m_workers.push_back( new mixerWorkerThread( this, - &m_workerSem ) ); + m_workers.push_back( new mixerWorkerThread( this, i ) ); } } @@ -319,21 +369,66 @@ void mixer::setClipScaling( bool _state ) -#define ADD_JOB(type,ptr) \ - m_workerSem.acquire(); \ - m_workerSem.release(); \ - for( int i = 0; i < m_workers.size(); ++i ) \ - { \ - if( m_workers[i]->idle() ) \ - { \ - m_workers[i]->addJob( type, ptr ); \ - break; \ - } \ +#define FILL_JOB_QUEUE(_jq,_vec_type,_vec,_job_type,_condition) \ + int id = 0; \ + for( _vec_type::iterator it = _vec.begin(); \ + it != _vec.end(); ++it ) \ + { \ + if( _condition ) \ + { \ + _jq.items.push_back( \ + mixerWorkerThread::jobQueueItem( id, _job_type, \ + *it ) );\ + id = (id+1) % m_numWorkers; \ + } \ } -#define WAIT_FOR_JOBS() \ - m_workerSem.acquire( m_workers.size() ); \ - m_workerSem.release( m_workers.size() ); +#define DISTRIBUTE_JOB_QUEUE(_jq) \ + _jq.remaining = _jq.items.size(); \ + for( int i = 0; i < m_numWorkers; ++i ) \ + { \ + m_workers[i]->addJob( &_jq ); \ + } + +#define WAIT_FOR_JOBS() \ + while( m_workerSem.available() < m_numWorkers ) \ + { \ + m_workerSem.acquire( 1 ); \ + m_workerSem.release( 1 ); \ + jq.lock.lockForRead(); \ + const int r = jq.remaining; \ + jq.lock.unlock(); \ + if( m_workerSem.available() >= m_numWorkers || \ + r <= m_numWorkers ) \ + break; \ + /* in case a worker has finished, try to re-assign */ \ + /* jobs of busy workers */ \ + for( int i = 0; i < m_numWorkers; ++i ) \ + { \ + if( m_workers[i]->idle() ) \ + { \ + int n = m_numWorkers-1; \ +for( mixerWorkerThread::jobQueueItems::iterator it = jq.items.end(); \ + it != jq.items.begin(); ) \ +{ \ + --it; \ + jq.lock.lockForRead(); \ + if( !it->done && it->workerID != i && \ + (++n) % m_numWorkers == 0 ) \ + { \ + jq.lock.unlock(); \ + jq.lock.lockForWrite(); \ + it->workerID = i; \ + } \ + jq.lock.unlock(); \ +} \ + m_workers[i]->addJob( &jq ); \ + break; \ + } \ + } \ + } \ + m_workerSem.acquire( m_numWorkers ); \ + m_workerSem.release( m_numWorkers ); @@ -415,17 +510,12 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) } ++idx; } - for( playHandleVector::iterator it = - m_playHandles.begin(); - it != m_playHandles.end(); ++it ) - { - if( !( *it )->done() && - !( *it )->supportsParallelizing() ) - { - ADD_JOB( mixerWorkerThread::PlayHandle, - *it ); - } - } + mixerWorkerThread::jobQueue jq; + FILL_JOB_QUEUE(jq,playHandleVector,m_playHandles, + mixerWorkerThread::PlayHandle, + !( *it )->done() && + !( *it )->supportsParallelizing() ); + DISTRIBUTE_JOB_QUEUE(jq); for( playHandleVector::iterator it = par_hndls.begin(); it != par_hndls.end(); ++it ) { @@ -462,32 +552,30 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) } } unlockPlayHandles(); - - bool more_effects = FALSE; - for( QVector::iterator it = m_audioPorts.begin(); - it != m_audioPorts.end(); ++it ) + if( m_multiThreaded ) { - if( m_multiThreaded ) - { - ADD_JOB( mixerWorkerThread::AudioPortEffects, - *it ); - } - else + mixerWorkerThread::jobQueue jq; + FILL_JOB_QUEUE(jq,QVector,m_audioPorts, + mixerWorkerThread::AudioPortEffects,1); + DISTRIBUTE_JOB_QUEUE(jq); + WAIT_FOR_JOBS(); + } + else + { + bool more_effects = FALSE; + for( QVector::iterator it = m_audioPorts.begin(); + it != m_audioPorts.end(); ++it ) { more_effects = ( *it )->processEffects(); if( ( *it )->m_bufferUsage != audioPort::NONE || more_effects ) { processBuffer( ( *it )->firstBuffer(), - ( *it )->nextFxChannel() ); + ( *it )->nextFxChannel() ); ( *it )->nextPeriod(); } } } - if( m_multiThreaded ) - { - WAIT_FOR_JOBS(); - } } unlock(); @@ -837,7 +925,7 @@ midiClient * mixer::tryMIDIClients( void ) void mixer::processBuffer( const surroundSampleFrame * _buf, fx_ch_t/* _fx_chnl */ ) { - // TODO: effect-implementation + // TODO: process according effect-channel /* if( m_scaleClip ) { @@ -848,7 +936,9 @@ void mixer::processBuffer( const surroundSampleFrame * _buf, m_newBuffer[chnl] = TRUE; } }*/ - + + static QMutex m; + m.lock(); for( fpp_t frame = 0; frame < m_framesPerPeriod; ++frame ) { for( ch_cnt_t chnl = 0; chnl < m_audioDev->channels(); ++chnl ) @@ -861,6 +951,7 @@ void mixer::processBuffer( const surroundSampleFrame * _buf, }*/ } } + m.unlock(); }