From c0f48d6196d938c750d293d8ea98b442d2b72e63 Mon Sep 17 00:00:00 2001 From: Tobias Doerffel Date: Thu, 5 Feb 2009 13:14:12 +0000 Subject: [PATCH] reworked mixer-threads (synchronization, realization of jobqueue etc.) which results in a much better performance and stability git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/trunk/lmms@1998 0778d3d1-df1d-0410-868b-ea421aaaa00d --- include/audio_port.h | 4 +- include/mixer.h | 11 +- src/core/mixer.cpp | 244 ++++++++++++++++++++++--------------------- 3 files changed, 132 insertions(+), 127 deletions(-) diff --git a/include/audio_port.h b/include/audio_port.h index b428c568b..8d41f87ea 100644 --- a/include/audio_port.h +++ b/include/audio_port.h @@ -1,7 +1,7 @@ /* * audio_port.h - base-class for objects providing sound at a port * - * Copyright (c) 2005-2008 Tobias Doerffel + * Copyright (c) 2005-2009 Tobias Doerffel * * This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net * @@ -136,7 +136,7 @@ private: friend class mixer; - friend class mixerWorkerThread; + friend class MixerWorkerThread; } ; diff --git a/include/mixer.h b/include/mixer.h index a35b85277..25c69eb6a 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -40,9 +40,9 @@ #include -#include #include #include +#include #include "lmms_basics.h" @@ -66,7 +66,7 @@ const Octaves BaseOctave = DefaultOctave; #include "play_handle.h" -class mixerWorkerThread; +class MixerWorkerThread; class EXPORT mixer : public QObject @@ -431,10 +431,9 @@ private: bool m_newBuffer[SURROUND_CHANNELS]; int m_cpuLoad; - QVector m_workers; + QVector m_workers; int m_numWorkers; - QSemaphore m_queueReadySem; - QSemaphore m_workersDoneSem; + QWaitCondition m_queueReadyWaitCond; playHandleVector m_playHandles; @@ -462,7 +461,7 @@ private: friend class engine; - friend class mixerWorkerThread; + friend class MixerWorkerThread; } ; diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index e99d5a3da..a131de76f 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -1,7 +1,7 @@ /* * mixer.cpp - audio-device-independent mixer for LMMS * - * Copyright (c) 2004-2008 Tobias Doerffel + * Copyright (c) 2004-2009 Tobias Doerffel * * This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net * @@ -71,7 +71,7 @@ static QVector __fx_channel_jobs( NumFxChannels ); -class mixerWorkerThread : public QThread +class MixerWorkerThread : public QThread { public: enum JobTypes @@ -83,61 +83,57 @@ public: NumJobTypes } ; - struct jobQueueItem + struct JobQueueItem { - jobQueueItem() : + JobQueueItem() : type( InvalidJob ), job( NULL ), + param( 0 ), done( false ) { } - jobQueueItem( JobTypes _type, void * _job ) : + JobQueueItem( JobTypes _type, void * _job, int _param = 0 ) : type( _type ), job( _job ), + param( _param ), done( false ) { } JobTypes type; + void * job; + int param; - union - { - playHandle * playHandleJob; - audioPort * audioPortJob; - int effectChannelJob; - volatile void * job; - }; - -#if QT_VERSION >= 0x040400 QAtomicInt done; -#else - volatile bool done; -#endif + } ; - typedef QVector jobQueueItems; - struct jobQueue + struct JobQueue { - jobQueueItems items; -#if QT_VERSION < 0x040400 - QMutex lock; -#endif +#define JOB_QUEUE_SIZE 1024 + JobQueue() : + queueSize( 0 ) + { + } + + JobQueueItem items[JOB_QUEUE_SIZE]; + int queueSize; + QAtomicInt itemsDone; } ; - static jobQueue s_jobQueue; + static JobQueue s_jobQueue; - mixerWorkerThread( int _worker_num, mixer * _mixer ) : + MixerWorkerThread( int _worker_num, mixer * _mixer ) : QThread( _mixer ), m_workingBuf( alignedAllocFrames( _mixer->framesPerPeriod() ) ), m_workerNum( _worker_num ), m_quit( false ), m_mixer( _mixer ), - m_queueReadySem( &m_mixer->m_queueReadySem ), - m_workersDoneSem( &m_mixer->m_workersDoneSem ) + m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) { } - virtual ~mixerWorkerThread() + virtual ~MixerWorkerThread() { alignedFreeFrames( m_workingBuf ); } @@ -147,60 +143,7 @@ public: m_quit = true; } - void processJobQueue( void ) - { - jobQueueItems::iterator end_it = s_jobQueue.items.end(); - for( jobQueueItems::iterator it = - s_jobQueue.items.begin(); - it != end_it; ++it ) - { -#if QT_VERSION >= 0x040400 - if( it->done.fetchAndStoreOrdered( 1 ) == 0 ) - { -#else - s_jobQueue.lock.lock(); - if( !it->done ) - { - it->done = true; - s_jobQueue.lock.unlock(); -#endif - switch( it->type ) - { - case PlayHandle: - it->playHandleJob->play( m_workingBuf ); - break; - case AudioPortEffects: - { - audioPort * a = it->audioPortJob; - const bool me = a->processEffects(); - if( me || a->m_bufferUsage != audioPort::NoUsage ) - { - engine::getFxMixer()->mixToChannel( a->firstBuffer(), - a->nextFxChannel() ); - a->nextPeriod(); - } - } - break; - case EffectChannel: - engine::getFxMixer()->processChannel( - (fx_ch_t) it->effectChannelJob ); - break; - default: -/*fprintf( stderr, "invalid job item type %d at %ld in jobqueue(%ld:%ld)\n", - (int) it->type, (long int) it, - (long int) s_jobQueue.items.begin(), - (long int) end_it );*/ - break; - } - } -#if QT_VERSION < 0x040400 - else - { - s_jobQueue.lock.unlock(); - } -#endif - } - } + void processJobQueue( void ); private: @@ -216,12 +159,13 @@ private: #endif #endif #endif - + QMutex m; while( m_quit == false ) { - m_queueReadySem->acquire(); + m.lock(); + m_queueReadyWaitCond->wait( &m ); processJobQueue(); - m_workersDoneSem->release(); + m.unlock(); } } @@ -229,34 +173,103 @@ private: int m_workerNum; volatile bool m_quit; mixer * m_mixer; - QSemaphore * m_queueReadySem; - QSemaphore * m_workersDoneSem; + QWaitCondition * m_queueReadyWaitCond; } ; -mixerWorkerThread::jobQueue mixerWorkerThread::s_jobQueue; +MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue; -#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \ - mixerWorkerThread::s_jobQueue.items.clear(); \ + +void MixerWorkerThread::processJobQueue( void ) +{ + for( int i = 0; i < s_jobQueue.queueSize; ++i ) + { + JobQueueItem * it = &s_jobQueue.items[i]; + if( it->done.fetchAndStoreOrdered( 1 ) == 0 ) + { + switch( it->type ) + { + case PlayHandle: + ( (playHandle *) it->job )-> + play( m_workingBuf ); + break; + case AudioPortEffects: + { + audioPort * a = (audioPort *) it->job; + const bool me = a->processEffects(); + if( me || a->m_bufferUsage != audioPort::NoUsage ) + { + engine::getFxMixer()->mixToChannel( a->firstBuffer(), + a->nextFxChannel() ); + a->nextPeriod(); + } + } + break; + case EffectChannel: + engine::getFxMixer()->processChannel( (fx_ch_t) it->param ); + break; + default: + break; + } + s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); + } + } +} + +#define FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ + MixerWorkerThread::s_jobQueue.queueSize = 0; \ + MixerWorkerThread::s_jobQueue.itemsDone = 0; \ for( _vec_type::iterator it = _vec.begin(); \ - it != _vec.end(); ++it ) \ + it != _vec.end(); ++it ) \ { \ if( _condition ) \ - { \ - mixerWorkerThread::s_jobQueue.items. \ - push_back( \ - mixerWorkerThread::jobQueueItem( _job_type, \ - (void *)*it ) );\ + { + +#define FILL_JOB_QUEUE_END() \ + ++MixerWorkerThread::s_jobQueue.queueSize; \ } \ } +#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \ + FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ + MixerWorkerThread::s_jobQueue.items \ + [MixerWorkerThread::s_jobQueue.queueSize] = \ + MixerWorkerThread::JobQueueItem( _job_type, \ + (void *) *it ); \ + FILL_JOB_QUEUE_END() + +#define FILL_JOB_QUEUE_PARAM(_vec_type,_vec,_job_type,_condition) \ + FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ + MixerWorkerThread::s_jobQueue.items \ + [MixerWorkerThread::s_jobQueue.queueSize] = \ + MixerWorkerThread::JobQueueItem( _job_type, \ + NULL, *it ); \ + FILL_JOB_QUEUE_END() + #define START_JOBS() \ - m_queueReadySem.release( m_numWorkers ); \ + m_queueReadyWaitCond.wakeAll(); + +// define a pause instruction for spinlock-loop - merely useful on +// HyperThreading systems with just one physical core (e.g. Intel Atom) +#ifdef LMMS_HOST_X86_64 +#define SPINLOCK_PAUSE() asm( "pause" ) +#else +#ifdef LMMS_HOST_X86_64 +#define SPINLOCK_PAUSE() asm( "pause" ) +#else +#define SPINLOCK_PAUSE() +#endif +#endif #define WAIT_FOR_JOBS() \ - m_workersDoneSem.acquire( m_numWorkers ); + m_workers[m_numWorkers]->processJobQueue(); \ + while( MixerWorkerThread::s_jobQueue.itemsDone < \ + MixerWorkerThread::s_jobQueue.queueSize ) \ + { \ + SPINLOCK_PAUSE(); \ + } \ @@ -270,10 +283,8 @@ mixer::mixer( void ) : m_writeBuf( NULL ), m_cpuLoad( 0 ), m_workers(), - m_numWorkers( QThread::idealThreadCount() > 1 ? - QThread::idealThreadCount()-1 : 0 ), - m_queueReadySem( m_numWorkers ), - m_workersDoneSem( m_numWorkers ), + m_numWorkers( QThread::idealThreadCount()-1 ), + m_queueReadyWaitCond(), m_qualitySettings( qualitySettings::Mode_Draft ), m_masterGain( 1.0f ), m_audioDev( NULL ), @@ -334,14 +345,12 @@ mixer::mixer( void ) : m_bufferPool.push_back( m_readBuf ); } - m_queueReadySem.acquire( m_numWorkers ); - m_workersDoneSem.acquire( m_numWorkers ); for( int i = 0; i < m_numWorkers+1; ++i ) { - mixerWorkerThread * wt = new mixerWorkerThread( i, this ); - if( i > 0 ) + MixerWorkerThread * wt = new MixerWorkerThread( i, this ); + if( i < m_numWorkers ) { - wt->start( QThread::HighestPriority ); + wt->start( QThread::TimeCriticalPriority ); } m_workers.push_back( wt ); } @@ -358,15 +367,15 @@ mixer::~mixer() { // distribute an empty job-queue so that worker-threads // get out of their processing-loop - mixerWorkerThread::s_jobQueue.items.clear(); + MixerWorkerThread::s_jobQueue.queueSize = 0; for( int w = 0; w < m_numWorkers; ++w ) { - m_workers[w+1]->quit(); + m_workers[w]->quit(); } START_JOBS(); for( int w = 0; w < m_numWorkers; ++w ) { - m_workers[w+1]->wait( 500 ); + m_workers[w]->wait( 500 ); } while( m_fifo->available() ) @@ -403,7 +412,7 @@ void mixer::startProcessing( bool _needs_fifo ) if( _needs_fifo ) { m_fifoWriter = new fifoWriter( this, m_fifo ); - m_fifoWriter->start( QThread::TimeCriticalPriority ); + m_fifoWriter->start( QThread::HighPriority ); } else { @@ -580,10 +589,9 @@ sampleFrameA * mixer::renderNextBuffer( void ) // STAGE 1: run and render all play handles FILL_JOB_QUEUE(playHandleVector,m_playHandles, - mixerWorkerThread::PlayHandle, + MixerWorkerThread::PlayHandle, !( *it )->done()); START_JOBS(); - m_workers[0]->processJobQueue(); WAIT_FOR_JOBS(); // removed all play handles which are done @@ -591,7 +599,7 @@ sampleFrameA * mixer::renderNextBuffer( void ) it != m_playHandles.end(); ) { if( ( *it )->affinityMatters() && - ( *it )->affinity() != QThread::currentThread() ) + ( *it )->affinity() != QThread::currentThread() ) { ++it; continue; @@ -610,17 +618,15 @@ sampleFrameA * mixer::renderNextBuffer( void ) // STAGE 2: process effects of all instrument- and sampletracks FILL_JOB_QUEUE(QVector,m_audioPorts, - mixerWorkerThread::AudioPortEffects,1); + MixerWorkerThread::AudioPortEffects,1); START_JOBS(); - m_workers[0]->processJobQueue(); WAIT_FOR_JOBS(); // STAGE 3: process effects in FX mixer - FILL_JOB_QUEUE(QVector,__fx_channel_jobs, - mixerWorkerThread::EffectChannel,1); + FILL_JOB_QUEUE_PARAM(QVector,__fx_channel_jobs, + MixerWorkerThread::EffectChannel,1); START_JOBS(); - m_workers[0]->processJobQueue(); WAIT_FOR_JOBS();