distribute whole job-queue instead of single jobs and re-assign unprocessed jobs to idle worker-threads - improves multi-threading behaviour a lot

git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/branches/lmms-mv@723 0778d3d1-df1d-0410-868b-ea421aaaa00d
This commit is contained in:
Tobias Doerffel
2008-02-27 14:57:07 +00:00
parent b81bafd382
commit 0acbf718af
2 changed files with 159 additions and 67 deletions

View File

@@ -366,6 +366,7 @@ private:
Uint8 m_cpuLoad;
bool m_multiThreaded;
QVector<mixerWorkerThread *> m_workers;
int m_numWorkers;
QSemaphore m_workerSem;

View File

@@ -72,18 +72,49 @@ public:
InvalidJob,
PlayHandle,
AudioPortEffects,
EffectChannel,
NumJobTypes
} ;
mixerWorkerThread( mixer * _mixer, QSemaphore * _sem ) :
struct jobQueueItem
{
jobQueueItem() :
workerID( -1 ),
type( InvalidJob ),
job( NULL ),
done( FALSE )
{
}
jobQueueItem( int _id, JobTypes _type, void * _job ) :
workerID( _id ),
type( _type ),
job( _job ),
done( FALSE )
{
}
int workerID;
JobTypes type;
void * job;
volatile bool done;
} ;
typedef QVector<jobQueueItem> jobQueueItems;
struct jobQueue
{
jobQueueItems items;
int remaining;
QReadWriteLock lock;
} ;
mixerWorkerThread( mixer * _mixer, int _id ) :
QThread( _mixer ),
m_mixer( _mixer ),
m_sem( _sem ),
m_id( _id ),
m_sem( &m_mixer->m_workerSem ),
m_jobWait( 1 ),
m_jobAccepted( 1 ),
m_idle( FALSE ),
m_job( NULL ),
m_jobType( InvalidJob )
m_jobQueue( NULL ),
m_idle( FALSE )
{
start( QThread::TimeCriticalPriority );
}
@@ -92,10 +123,9 @@ public:
{
}
void addJob( JobTypes _t, void * _job )
void addJob( jobQueue * _q )
{
m_jobType = _t;
m_job = _job;
m_jobQueue = _q;
m_jobWait.release();
m_jobAccepted.acquire();
}
@@ -118,20 +148,40 @@ private:
m_idle = FALSE;
m_sem->acquire();
m_jobAccepted.release();
if( m_jobType == PlayHandle )
for( jobQueueItems::iterator it = m_jobQueue->items.begin();
it != m_jobQueue->items.end(); ++it )
{
( (playHandle *) m_job )->play();
}
else if( m_jobType == AudioPortEffects )
{
audioPort * a = (audioPort *) m_job;
bool me = a->processEffects();
if( a->m_bufferUsage != audioPort::NONE || me )
m_jobQueue->lock.lockForRead();
const int id = it->workerID;
const bool done = it->done;
m_jobQueue->lock.unlock();
if( !done && id == m_id )
{
m_mixer->processBuffer(
a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
m_jobQueue->lock.lockForWrite();
it->done = TRUE;
--m_jobQueue->remaining;
m_jobQueue->lock.unlock();
switch( it->type )
{
case PlayHandle:
( (playHandle *) it->job )->play();
break;
case AudioPortEffects:
{
audioPort * a = (audioPort *) it->job;
bool me = a->processEffects();
if( a->m_bufferUsage != audioPort::NONE || me )
{
m_mixer->processBuffer(
a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
default:
break;
}
}
}
m_idle = TRUE;
@@ -140,11 +190,11 @@ private:
}
mixer * m_mixer;
int m_id;
QSemaphore * m_sem;
QSemaphore m_jobWait;
QSemaphore m_jobAccepted;
void * m_job;
JobTypes m_jobType;
jobQueue * m_jobQueue;
volatile bool m_idle;
} ;
@@ -158,7 +208,8 @@ mixer::mixer( void ) :
m_cpuLoad( 0 ),
m_multiThreaded( QThread::idealThreadCount() > 1 ),
m_workers(),
m_workerSem( m_multiThreaded ? QThread::idealThreadCount() : 0 ),
m_numWorkers( m_multiThreaded ? QThread::idealThreadCount() : 0 ),
m_workerSem( m_numWorkers ),
m_qualityLevel( DEFAULT_QUALITY_LEVEL ),
m_masterGain( 1.0f ),
m_audioDev( NULL ),
@@ -200,10 +251,9 @@ mixer::mixer( void ) :
if( m_multiThreaded )
{
for( int i = 0; i < QThread::idealThreadCount(); ++i )
for( int i = 0; i < m_numWorkers; ++i )
{
m_workers.push_back( new mixerWorkerThread( this,
&m_workerSem ) );
m_workers.push_back( new mixerWorkerThread( this, i ) );
}
}
@@ -319,21 +369,66 @@ void mixer::setClipScaling( bool _state )
#define ADD_JOB(type,ptr) \
m_workerSem.acquire(); \
m_workerSem.release(); \
for( int i = 0; i < m_workers.size(); ++i ) \
{ \
if( m_workers[i]->idle() ) \
{ \
m_workers[i]->addJob( type, ptr ); \
break; \
} \
#define FILL_JOB_QUEUE(_jq,_vec_type,_vec,_job_type,_condition) \
int id = 0; \
for( _vec_type::iterator it = _vec.begin(); \
it != _vec.end(); ++it ) \
{ \
if( _condition ) \
{ \
_jq.items.push_back( \
mixerWorkerThread::jobQueueItem( id, _job_type, \
*it ) );\
id = (id+1) % m_numWorkers; \
} \
}
#define WAIT_FOR_JOBS() \
m_workerSem.acquire( m_workers.size() ); \
m_workerSem.release( m_workers.size() );
#define DISTRIBUTE_JOB_QUEUE(_jq) \
_jq.remaining = _jq.items.size(); \
for( int i = 0; i < m_numWorkers; ++i ) \
{ \
m_workers[i]->addJob( &_jq ); \
}
#define WAIT_FOR_JOBS() \
while( m_workerSem.available() < m_numWorkers ) \
{ \
m_workerSem.acquire( 1 ); \
m_workerSem.release( 1 ); \
jq.lock.lockForRead(); \
const int r = jq.remaining; \
jq.lock.unlock(); \
if( m_workerSem.available() >= m_numWorkers || \
r <= m_numWorkers ) \
break; \
/* in case a worker has finished, try to re-assign */ \
/* jobs of busy workers */ \
for( int i = 0; i < m_numWorkers; ++i ) \
{ \
if( m_workers[i]->idle() ) \
{ \
int n = m_numWorkers-1; \
for( mixerWorkerThread::jobQueueItems::iterator it = jq.items.end(); \
it != jq.items.begin(); ) \
{ \
--it; \
jq.lock.lockForRead(); \
if( !it->done && it->workerID != i && \
(++n) % m_numWorkers == 0 ) \
{ \
jq.lock.unlock(); \
jq.lock.lockForWrite(); \
it->workerID = i; \
} \
jq.lock.unlock(); \
} \
m_workers[i]->addJob( &jq ); \
break; \
} \
} \
} \
m_workerSem.acquire( m_numWorkers ); \
m_workerSem.release( m_numWorkers );
@@ -415,17 +510,12 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
}
++idx;
}
for( playHandleVector::iterator it =
m_playHandles.begin();
it != m_playHandles.end(); ++it )
{
if( !( *it )->done() &&
!( *it )->supportsParallelizing() )
{
ADD_JOB( mixerWorkerThread::PlayHandle,
*it );
}
}
mixerWorkerThread::jobQueue jq;
FILL_JOB_QUEUE(jq,playHandleVector,m_playHandles,
mixerWorkerThread::PlayHandle,
!( *it )->done() &&
!( *it )->supportsParallelizing() );
DISTRIBUTE_JOB_QUEUE(jq);
for( playHandleVector::iterator it = par_hndls.begin();
it != par_hndls.end(); ++it )
{
@@ -462,32 +552,30 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
}
}
unlockPlayHandles();
bool more_effects = FALSE;
for( QVector<audioPort *>::iterator it = m_audioPorts.begin();
it != m_audioPorts.end(); ++it )
if( m_multiThreaded )
{
if( m_multiThreaded )
{
ADD_JOB( mixerWorkerThread::AudioPortEffects,
*it );
}
else
mixerWorkerThread::jobQueue jq;
FILL_JOB_QUEUE(jq,QVector<audioPort*>,m_audioPorts,
mixerWorkerThread::AudioPortEffects,1);
DISTRIBUTE_JOB_QUEUE(jq);
WAIT_FOR_JOBS();
}
else
{
bool more_effects = FALSE;
for( QVector<audioPort *>::iterator it = m_audioPorts.begin();
it != m_audioPorts.end(); ++it )
{
more_effects = ( *it )->processEffects();
if( ( *it )->m_bufferUsage != audioPort::NONE ||
more_effects )
{
processBuffer( ( *it )->firstBuffer(),
( *it )->nextFxChannel() );
( *it )->nextFxChannel() );
( *it )->nextPeriod();
}
}
}
if( m_multiThreaded )
{
WAIT_FOR_JOBS();
}
}
unlock();
@@ -837,7 +925,7 @@ midiClient * mixer::tryMIDIClients( void )
void mixer::processBuffer( const surroundSampleFrame * _buf,
fx_ch_t/* _fx_chnl */ )
{
// TODO: effect-implementation
// TODO: process according effect-channel
/* if( m_scaleClip )
{
@@ -848,7 +936,9 @@ void mixer::processBuffer( const surroundSampleFrame * _buf,
m_newBuffer[chnl] = TRUE;
}
}*/
static QMutex m;
m.lock();
for( fpp_t frame = 0; frame < m_framesPerPeriod; ++frame )
{
for( ch_cnt_t chnl = 0; chnl < m_audioDev->channels(); ++chnl )
@@ -861,6 +951,7 @@ void mixer::processBuffer( const surroundSampleFrame * _buf,
}*/
}
}
m.unlock();
}