From e0851b970ef4ad4a41997ad9c6fc41ba7171994c Mon Sep 17 00:00:00 2001 From: Tobias Doerffel Date: Wed, 27 Feb 2008 00:45:30 +0000 Subject: [PATCH] initial support for parallel rendering via worker-threads - far from being perfect (especially with small mixer-period-sizes) but seems to help even on a DualCore machine git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/branches/lmms-mv@720 0778d3d1-df1d-0410-868b-ea421aaaa00d --- include/mixer.h | 9 ++- src/core/mixer.cpp | 179 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 154 insertions(+), 34 deletions(-) diff --git a/include/mixer.h b/include/mixer.h index b7cfab426..9dd4a277e 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -31,6 +31,7 @@ #endif #include +#include #include #include @@ -91,6 +92,8 @@ const tones BASE_TONE = A; const octaves BASE_OCTAVE = OCTAVE_4; +class mixerWorkerThread; + class mixer : public QObject { @@ -361,7 +364,10 @@ private: bool m_newBuffer[SURROUND_CHANNELS]; Uint8 m_cpuLoad; - int m_parallelizingLevel; + bool m_multiThreaded; + QVector m_workers; + QSemaphore m_workerSem; + playHandleVector m_playHandles; constPlayHandleVector m_playHandlesToRemove; @@ -389,6 +395,7 @@ private: friend class engine; + friend class mixerWorkerThread; } ; diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index 9b421c204..b8a6a7390 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -25,6 +25,8 @@ */ +#include + #include #include "mixer.h" @@ -62,12 +64,101 @@ sample_rate_t SAMPLE_RATES[QUALITY_LEVELS] = { 44100, 88200 } ; +class mixerWorkerThread : public QThread +{ +public: + enum JobTypes + { + InvalidJob, + PlayHandle, + AudioPortEffects, + NumJobTypes + } ; + + mixerWorkerThread( mixer * _mixer, QSemaphore * _sem ) : + QThread( _mixer ), + m_mixer( _mixer ), + m_sem( _sem ), + m_jobWait( 1 ), + m_jobAccepted( 1 ), + m_idle( FALSE ), + m_job( NULL ), + m_jobType( InvalidJob ) + { + start( QThread::TimeCriticalPriority ); + } + + virtual ~mixerWorkerThread() + { + } + + void addJob( JobTypes _t, void * _job ) + { + m_jobType = _t; + m_job = _job; + m_jobWait.release(); + m_jobAccepted.acquire(); + } + + inline bool idle( void ) + { + return( m_idle ); + + } + +private: + virtual void run( void ) + { + m_jobWait.acquire(); + m_jobAccepted.acquire(); + m_idle = TRUE; + while( 1 ) + { + m_jobWait.acquire(); + m_idle = FALSE; + m_sem->acquire(); + m_jobAccepted.release(); + if( m_jobType == PlayHandle ) + { + ( (playHandle *) m_job )->play(); + } + else if( m_jobType == AudioPortEffects ) + { + audioPort * a = (audioPort *) m_job; + bool me = a->processEffects(); + if( a->m_bufferUsage != audioPort::NONE || me ) + { + m_mixer->processBuffer( + a->firstBuffer(), + a->nextFxChannel() ); + a->nextPeriod(); + } + } + m_idle = TRUE; + m_sem->release(); + } + } + + mixer * m_mixer; + QSemaphore * m_sem; + QSemaphore m_jobWait; + QSemaphore m_jobAccepted; + void * m_job; + JobTypes m_jobType; + volatile bool m_idle; + +} ; + + + mixer::mixer( void ) : m_framesPerPeriod( DEFAULT_BUFFER_SIZE ), m_readBuf( NULL ), m_writeBuf( NULL ), m_cpuLoad( 0 ), - m_parallelizingLevel( 1 ), + m_multiThreaded( QThread::idealThreadCount() > 1 ), + m_workers(), + m_workerSem( m_multiThreaded ? QThread::idealThreadCount() : 0 ), m_qualityLevel( DEFAULT_QUALITY_LEVEL ), m_masterGain( 1.0f ), m_audioDev( NULL ), @@ -99,13 +190,6 @@ mixer::mixer( void ) : m_fifo = new fifo( 1 ); } - if( configManager::inst()->value( "mixer", "parallelizinglevel" - ).toInt() > 0 ) - { - m_parallelizingLevel =configManager::inst()->value( "mixer", - "parallelizinglevel" ).toInt(); - } - for( Uint8 i = 0; i < 3; i++ ) { m_readBuf = new surroundSampleFrame[m_framesPerPeriod]; @@ -113,7 +197,16 @@ mixer::mixer( void ) : clearAudioBuffer( m_readBuf, m_framesPerPeriod ); m_bufferPool.push_back( m_readBuf ); } - + + if( m_multiThreaded ) + { + for( int i = 0; i < QThread::idealThreadCount(); ++i ) + { + m_workers.push_back( new mixerWorkerThread( this, + &m_workerSem ) ); + } + } + setClipScaling( FALSE ); } @@ -226,6 +319,23 @@ void mixer::setClipScaling( bool _state ) +#define ADD_JOB(type,ptr) \ + m_workerSem.acquire(); \ + m_workerSem.release(); \ + for( int i = 0; i < m_workers.size(); ++i ) \ + { \ + if( m_workers[i]->idle() ) \ + { \ + m_workers[i]->addJob( type, ptr ); \ + break; \ + } \ + } + +#define WAIT_FOR_JOBS() \ + m_workerSem.acquire( m_workers.size() ); \ + m_workerSem.release( m_workers.size() ); + + const surroundSampleFrame * mixer::renderNextBuffer( void ) { @@ -292,20 +402,8 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) lockPlayHandles(); int idx = 0; - if( m_parallelizingLevel > 1 ) + if( m_multiThreaded ) { -// TODO: if not enough play-handles are found which are capable of -// parallelizing, create according worker-threads. each of this threads -// processes a certain part of our m_playHandles-vector -// the question is the queueing model which we can use: -// 1) m_playHandles is divided into m_parallelizingLevel sub-vectors -// each sub-vector is processed by a worker-thread -// 2) create a stack with all play-handles that need to be processed, -// save it via a mutex and then let all worker-threads "fetch jobs" -// from the stack - this way it's guaranteed the work is -// balanced across all worker-threads. this would avoid cases -// where the sub-vector of a thread only contains notes that are -// done and only need to be deleted. playHandleVector par_hndls; while( idx < m_playHandles.size() ) { @@ -324,7 +422,8 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) if( !( *it )->done() && !( *it )->supportsParallelizing() ) { - ( *it )->play(); + ADD_JOB( mixerWorkerThread::PlayHandle, + *it ); } } for( playHandleVector::iterator it = par_hndls.begin(); @@ -332,6 +431,7 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) { ( *it )->waitForWorkerThread(); } + WAIT_FOR_JOBS(); } else { @@ -341,6 +441,7 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) { if( !( *it )->done() ) { + ( *it )->play(); } } @@ -366,14 +467,26 @@ const surroundSampleFrame * mixer::renderNextBuffer( void ) for( QVector::iterator it = m_audioPorts.begin(); it != m_audioPorts.end(); ++it ) { - more_effects = ( *it )->processEffects(); - if( ( *it )->m_bufferUsage != audioPort::NONE || - more_effects ) + if( m_multiThreaded ) { - processBuffer( ( *it )->firstBuffer(), - ( *it )->nextFxChannel() ); - ( *it )->nextPeriod(); + ADD_JOB( mixerWorkerThread::AudioPortEffects, + *it ); } + else + { + more_effects = ( *it )->processEffects(); + if( ( *it )->m_bufferUsage != audioPort::NONE || + more_effects ) + { + processBuffer( ( *it )->firstBuffer(), + ( *it )->nextFxChannel() ); + ( *it )->nextPeriod(); + } + } + } + if( m_multiThreaded ) + { + WAIT_FOR_JOBS(); } } @@ -726,7 +839,7 @@ void mixer::processBuffer( const surroundSampleFrame * _buf, { // TODO: effect-implementation - if( m_scaleClip ) +/* if( m_scaleClip ) { for( ch_cnt_t chnl=0; chnl < m_audioDev->channels(); @@ -734,18 +847,18 @@ void mixer::processBuffer( const surroundSampleFrame * _buf, { m_newBuffer[chnl] = TRUE; } - } + }*/ for( fpp_t frame = 0; frame < m_framesPerPeriod; ++frame ) { for( ch_cnt_t chnl = 0; chnl < m_audioDev->channels(); ++chnl ) { m_writeBuf[frame][chnl] += _buf[frame][chnl]; - +/* if( m_scaleClip ) { scaleClip( frame, chnl ); - } + }*/ } } }