great improvements on multithreading - one global job-queue protected by a simple mutex where threads pull their jobs from

git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/trunk/lmms@878 0778d3d1-df1d-0410-868b-ea421aaaa00d
This commit is contained in:
Tobias Doerffel
2008-04-03 22:49:51 +00:00
parent 9362c8a894
commit 3e005439c4

View File

@@ -83,20 +83,17 @@ public:
struct jobQueueItem
{
jobQueueItem() :
workerID( -1 ),
type( InvalidJob ),
job( NULL ),
done( FALSE )
{
}
jobQueueItem( int _id, JobTypes _type, void * _job ) :
workerID( _id ),
jobQueueItem( JobTypes _type, void * _job ) :
type( _type ),
job( _job ),
done( FALSE )
{
}
int workerID;
JobTypes type;
void * job;
volatile bool done;
@@ -106,14 +103,12 @@ public:
struct jobQueue
{
jobQueueItems items;
int remaining;
QReadWriteLock lock;
QMutex lock;
} ;
mixerWorkerThread( mixer * _mixer, int _id ) :
mixerWorkerThread( mixer * _mixer ) :
QThread( _mixer ),
m_mixer( _mixer ),
m_id( _id ),
m_sem( &m_mixer->m_workerSem ),
m_jobWait( 1 ),
m_jobAccepted( 1 ),
@@ -155,15 +150,10 @@ private:
for( jobQueueItems::iterator it = m_jobQueue->items.begin();
it != m_jobQueue->items.end(); ++it )
{
m_jobQueue->lock.lockForRead();
const int id = it->workerID;
const bool done = it->done;
m_jobQueue->lock.unlock();
if( !done && id == m_id )
m_jobQueue->lock.lock();
if( !it->done )
{
m_jobQueue->lock.lockForWrite();
it->done = TRUE;
--m_jobQueue->remaining;
m_jobQueue->lock.unlock();
switch( it->type )
{
@@ -186,6 +176,10 @@ private:
break;
}
}
else
{
m_jobQueue->lock.unlock();
}
}
m_idle = TRUE;
m_sem->release();
@@ -193,7 +187,6 @@ private:
}
mixer * m_mixer;
int m_id;
QSemaphore * m_sem;
QSemaphore m_jobWait;
QSemaphore m_jobAccepted;
@@ -256,7 +249,7 @@ mixer::mixer( void ) :
{
for( int i = 0; i < m_numWorkers; ++i )
{
m_workers.push_back( new mixerWorkerThread( this, i ) );
m_workers.push_back( new mixerWorkerThread( this ) );
}
}
@@ -333,63 +326,24 @@ bool mixer::criticalXRuns( void ) const
#define FILL_JOB_QUEUE(_jq,_vec_type,_vec,_job_type,_condition) \
int id = 0; \
for( _vec_type::iterator it = _vec.begin(); \
it != _vec.end(); ++it ) \
{ \
if( _condition ) \
{ \
_jq.items.push_back( \
mixerWorkerThread::jobQueueItem( id, _job_type, \
mixerWorkerThread::jobQueueItem( _job_type, \
*it ) );\
id = (id+1) % m_numWorkers; \
} \
}
#define DISTRIBUTE_JOB_QUEUE(_jq) \
_jq.remaining = _jq.items.size(); \
for( int i = 0; i < m_numWorkers; ++i ) \
{ \
m_workers[i]->addJob( &_jq ); \
}
#define WAIT_FOR_JOBS(_cond) \
while( m_workerSem.available() < m_numWorkers ) \
{ \
m_workerSem.acquire( 1 ); \
m_workerSem.release( 1 ); \
jq.lock.lockForRead(); \
const int r = jq.remaining; \
jq.lock.unlock(); \
if( m_workerSem.available() >= m_numWorkers || \
r <= m_numWorkers ) \
break; \
/* in case a worker has finished, try to re-assign */ \
/* jobs of busy workers */ \
for( int i = 0; i < m_numWorkers; ++i ) \
{ \
if( m_workers[i]->idle() ) \
{ \
int n = m_numWorkers-1; \
for( mixerWorkerThread::jobQueueItems::iterator it = jq.items.end(); \
it != jq.items.begin(); ) \
{ \
--it; \
jq.lock.lockForRead(); \
if( _cond && !it->done && it->workerID != i && \
(++n) % m_numWorkers == 0 ) \
{ \
jq.lock.unlock(); \
jq.lock.lockForWrite(); \
it->workerID = i; \
} \
jq.lock.unlock(); \
} \
m_workers[i]->addJob( &jq ); \
break; \
} \
} \
} \
#define WAIT_FOR_JOBS() \
m_workerSem.acquire( m_numWorkers ); \
m_workerSem.release( m_numWorkers );
@@ -482,7 +436,7 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
// we have to process all note-play-handles of a monophonic instrument by the
// same thread serially as monophonic instruments rely on processing note-play-
// handles in correct order
QHash<instrumentTrack *, int> h;
/* QHash<instrumentTrack *, int> h;
for( mixerWorkerThread::jobQueueItems::iterator it =
jq.items.begin(); it != jq.items.end(); ++it )
{
@@ -505,14 +459,14 @@ if( COND_NPH )
}
}
}
}
}*/
DISTRIBUTE_JOB_QUEUE(jq);
for( playHandleVector::iterator it = par_hndls.begin();
it != par_hndls.end(); ++it )
{
( *it )->waitForWorkerThread();
}
WAIT_FOR_JOBS( h.size() > 0 && ( COND_NPH ? !COND_MONOPHONIC : TRUE ) );
WAIT_FOR_JOBS();// h.size() > 0 && ( COND_NPH ? !COND_MONOPHONIC : TRUE ) );
}
else
{
@@ -549,7 +503,7 @@ if( COND_NPH )
FILL_JOB_QUEUE(jq,QVector<audioPort*>,m_audioPorts,
mixerWorkerThread::AudioPortEffects,1);
DISTRIBUTE_JOB_QUEUE(jq);
WAIT_FOR_JOBS(TRUE);
WAIT_FOR_JOBS();
}
else
{