From 3fa96a576c621a2ac9f470c557ec3f5e2f80df16 Mon Sep 17 00:00:00 2001 From: Tobias Doerffel Date: Sun, 11 Oct 2009 01:36:55 +0200 Subject: [PATCH] Mixer: rewrote/reorganized job queueing for worker threads In Mixer, the old C-macro based code has been replaced by an OOP-like design. Management of job queue now happens via some static member methods of MixerWorkerThread. All the moved code still needs to be splitted into some new files but here's a first dirty version. All objects that are intended to be processed by MixerWorkerThreads have to inherit ThreadableJob (name of class is subject of change). One can add jobs to the job queue even if the queue is already being processed. This is merely important for multithreading with upcoming FX sends support. --- include/AudioPort.h | 9 +- include/mixer.h | 177 +++++++++++++++++++++++++++- include/play_handle.h | 18 ++- src/core/FxMixer.cpp | 7 ++ src/core/audio/AudioPort.cpp | 17 ++- src/core/mixer.cpp | 220 +---------------------------------- 6 files changed, 224 insertions(+), 224 deletions(-) diff --git a/include/AudioPort.h b/include/AudioPort.h index 7157f729b..d84dd179f 100644 --- a/include/AudioPort.h +++ b/include/AudioPort.h @@ -33,7 +33,7 @@ class EffectChain; -class AudioPort +class AudioPort : public ThreadableJob { public: AudioPort( const QString & _name, bool _has_effect_chain = true ); @@ -109,6 +109,13 @@ public: bool processEffects(); + // ThreadableJob stuff + virtual void doProcessing( sampleFrame * ); + virtual bool requiresProcessing() const + { + return true; + } + enum bufferUsages { diff --git a/include/mixer.h b/include/mixer.h index 3562db3f1..81be6af7f 100644 --- a/include/mixer.h +++ b/include/mixer.h @@ -62,11 +62,46 @@ const Keys BaseKey = Key_A; const Octaves BaseOctave = DefaultOctave; -#include "play_handle.h" - class MixerWorkerThread; +// TODO: move to ThreadableJob.h +class ThreadableJob +{ +public: + ThreadableJob() : + m_done( false ) + { + } + + void reset() + { + m_done = false; + } + + bool process( sampleFrame * _working_buffer ) + { + if( m_done.fetchAndStoreOrdered( true ) == false ) + { + doProcessing( _working_buffer ); + return true; + } + return false; + } + + virtual bool requiresProcessing() const = 0; + + +private: + virtual void doProcessing( sampleFrame * _working_buffer ) = 0; + + QAtomicInt m_done; + +} ; + + +#include "play_handle.h" + class EXPORT mixer : public QObject { @@ -466,4 +501,142 @@ private: } ; +// TODO: move to MixerWorkerThread.h / MixerWorkerThread.cpp +#include "Cpu.h" +#include "engine.h" + +class MixerWorkerThread : public QThread +{ +public: + struct JobQueue + { +#define JOB_QUEUE_SIZE 1024 + JobQueue() : + queueSize( 0 ), + itemsDone( 0 ) + { + for( int i = 0; i < JOB_QUEUE_SIZE; ++i ) + { + items[i] = NULL; + } + } + + ThreadableJob * items[JOB_QUEUE_SIZE]; + QAtomicInt queueSize; + QAtomicInt itemsDone; + } ; + + static JobQueue s_jobQueue; + + MixerWorkerThread( int _worker_num, mixer * _mixer ) : + QThread( _mixer ), + m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ), + m_workerNum( _worker_num ), + m_quit( false ), + m_mixer( _mixer ), + m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) + { + } + + virtual ~MixerWorkerThread() + { + CPU::freeFrames( m_workingBuf ); + } + + virtual void quit() + { + m_quit = true; + } + + void processJobQueue() + { + for( int i = 0; i < s_jobQueue.queueSize; ++i ) + { + // returns true if ThreadableJob was not processed before + if( s_jobQueue.items[i]->process( m_workingBuf ) ) + { + s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); + } + } + } + + template + static void fillJobQueue( const T & _vec ) + { + s_jobQueue.queueSize = 0; + s_jobQueue.itemsDone = 0; + for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it ) + { + addJob( *it ); + } + } + + static void addJob( ThreadableJob * _job ) + { + if( _job->requiresProcessing() ) + { + _job->reset(); + s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job; + } + } + + +// define a pause instruction for spinlock-loop - merely useful on +// HyperThreading systems with just one physical core (e.g. Intel Atom) +#ifdef LMMS_HOST_X86 +#define SPINLOCK_PAUSE() asm( "pause" ) +#else +#ifdef LMMS_HOST_X86_64 +#define SPINLOCK_PAUSE() asm( "pause" ) +#else +#define SPINLOCK_PAUSE() +#endif +#endif + + static void startJobs() + { + // TODO: this is dirty! + engine::getMixer()->m_queueReadyWaitCond.wakeAll(); + } + + static void waitForJobs() + { + // TODO: this is dirty! + mixer * m = engine::getMixer(); + m->m_workers[m->m_numWorkers]->processJobQueue(); + while( s_jobQueue.itemsDone < s_jobQueue.queueSize ) + { + SPINLOCK_PAUSE(); + } + } + + static void startAndWaitForJobs() + { + startJobs(); + waitForJobs(); + } + + +private: + virtual void run() + { + QMutex m; + while( m_quit == false ) + { + m.lock(); + m_queueReadyWaitCond->wait( &m ); + processJobQueue(); + m.unlock(); + } + } + + sampleFrame * m_workingBuf; + int m_workerNum; + volatile bool m_quit; + mixer * m_mixer; + QWaitCondition * m_queueReadyWaitCond; + +} ; + + #endif diff --git a/include/play_handle.h b/include/play_handle.h index 365da3f19..1a49fb540 100644 --- a/include/play_handle.h +++ b/include/play_handle.h @@ -25,15 +25,12 @@ #ifndef _PLAY_HANDLE_H #define _PLAY_HANDLE_H -#include -#include - -#include "lmms_basics.h" +#include "mixer.h" class track; -class playHandle +class playHandle : public ThreadableJob { public: enum Types @@ -71,6 +68,17 @@ public: return m_type; } + // required for ThreadableJob + virtual void doProcessing( sampleFrame * _working_buffer ) + { + play( _working_buffer ); + } + + virtual bool requiresProcessing() const + { + return !done(); + } + virtual void play( sampleFrame * _working_buffer ) = 0; virtual bool done() const = 0; diff --git a/src/core/FxMixer.cpp b/src/core/FxMixer.cpp index 7cf27b8c3..844f8fd7d 100644 --- a/src/core/FxMixer.cpp +++ b/src/core/FxMixer.cpp @@ -438,6 +438,13 @@ void FxMixer::masterMix( sampleFrame * _buf ) m_fxChannels[0]->m_peakLeft *= engine::getMixer()->masterGain(); m_fxChannels[0]->m_peakRight *= engine::getMixer()->masterGain(); + + // clear all channel buffers + for( int i = 0; i < numChannels(); ++i) + { + engine::getMixer()->clearAudioBuffer( m_fxChannels[i]->m_buffer, + engine::getMixer()->framesPerPeriod() ); + } } diff --git a/src/core/audio/AudioPort.cpp b/src/core/audio/AudioPort.cpp index 2b4301178..d9dc86b06 100644 --- a/src/core/audio/AudioPort.cpp +++ b/src/core/audio/AudioPort.cpp @@ -24,9 +24,10 @@ #include "AudioPort.h" #include "AudioDevice.h" -#include "EffectChain.h" -#include "engine.h" #include "Cpu.h" +#include "EffectChain.h" +#include "FxMixer.h" +#include "engine.h" AudioPort::AudioPort( const QString & _name, bool _has_effect_chain ) : @@ -123,3 +124,15 @@ bool AudioPort::processEffects() } + + +void AudioPort::doProcessing( sampleFrame * ) +{ + const bool me = processEffects(); + if( me || m_bufferUsage != NoUsage ) + { + engine::fxMixer()->mixToChannel( firstBuffer(), nextFxChannel() ); + nextPeriod(); + } +} + diff --git a/src/core/mixer.cpp b/src/core/mixer.cpp index b33e4c49f..f8e0c38b0 100644 --- a/src/core/mixer.cpp +++ b/src/core/mixer.cpp @@ -61,197 +61,11 @@ #endif -class MixerWorkerThread : public QThread -{ -public: - enum JobTypes - { - InvalidJob, - PlayHandle, - AudioPortEffects, - EffectChannel, - NumJobTypes - } ; - - struct JobQueueItem - { - JobQueueItem() : - type( InvalidJob ), - job( NULL ), - param( 0 ), - done( false ) - { - } - JobQueueItem( JobTypes _type, void * _job, int _param = 0 ) : - type( _type ), - job( _job ), - param( _param ), - done( false ) - { - } - - JobTypes type; - void * job; - int param; - - QAtomicInt done; - - } ; - - struct JobQueue - { -#define JOB_QUEUE_SIZE 1024 - JobQueue() : - queueSize( 0 ) - { - } - - JobQueueItem items[JOB_QUEUE_SIZE]; - int queueSize; - QAtomicInt itemsDone; - } ; - - static JobQueue s_jobQueue; - - MixerWorkerThread( int _worker_num, mixer * _mixer ) : - QThread( _mixer ), - m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ), - m_workerNum( _worker_num ), - m_quit( false ), - m_mixer( _mixer ), - m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond ) - { - } - - virtual ~MixerWorkerThread() - { - CPU::freeFrames( m_workingBuf ); - } - - virtual void quit() - { - m_quit = true; - } - - void processJobQueue(); - - -private: - virtual void run() - { - QMutex m; - while( m_quit == false ) - { - m.lock(); - m_queueReadyWaitCond->wait( &m ); - processJobQueue(); - m.unlock(); - } - } - - sampleFrame * m_workingBuf; - int m_workerNum; - volatile bool m_quit; - mixer * m_mixer; - QWaitCondition * m_queueReadyWaitCond; - -} ; - MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue; -void MixerWorkerThread::processJobQueue() -{ - for( int i = 0; i < s_jobQueue.queueSize; ++i ) - { - JobQueueItem * it = &s_jobQueue.items[i]; - if( it->done.fetchAndStoreOrdered( 1 ) == 0 ) - { - switch( it->type ) - { - case PlayHandle: - ( (playHandle *) it->job )-> - play( m_workingBuf ); - break; - case AudioPortEffects: - { - AudioPort * a = (AudioPort *) it->job; - const bool me = a->processEffects(); - if( me || a->m_bufferUsage != AudioPort::NoUsage ) - { - engine::fxMixer()->mixToChannel( a->firstBuffer(), - a->nextFxChannel() ); - a->nextPeriod(); - } - } - break; - case EffectChannel: - engine::fxMixer()->processChannel( (fx_ch_t) it->param ); - break; - default: - break; - } - s_jobQueue.itemsDone.fetchAndAddOrdered( 1 ); - } - } -} - -#define FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ - MixerWorkerThread::s_jobQueue.queueSize = 0; \ - MixerWorkerThread::s_jobQueue.itemsDone = 0; \ - for( _vec_type::Iterator it = _vec.begin(); \ - it != _vec.end(); ++it ) \ - { \ - if( _condition ) \ - { - -#define FILL_JOB_QUEUE_END() \ - ++MixerWorkerThread::s_jobQueue.queueSize; \ - } \ - } - -#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \ - FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ - MixerWorkerThread::s_jobQueue.items \ - [MixerWorkerThread::s_jobQueue.queueSize] = \ - MixerWorkerThread::JobQueueItem( _job_type, \ - (void *) *it ); \ - FILL_JOB_QUEUE_END() - -#define FILL_JOB_QUEUE_PARAM(_vec_type,_vec,_job_type,_condition) \ - FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \ - MixerWorkerThread::s_jobQueue.items \ - [MixerWorkerThread::s_jobQueue.queueSize] = \ - MixerWorkerThread::JobQueueItem( _job_type, \ - NULL, *it ); \ - FILL_JOB_QUEUE_END() - -#define START_JOBS() \ - m_queueReadyWaitCond.wakeAll(); - -// define a pause instruction for spinlock-loop - merely useful on -// HyperThreading systems with just one physical core (e.g. Intel Atom) -#ifdef LMMS_HOST_X86 -#define SPINLOCK_PAUSE() asm( "pause" ) -#else -#ifdef LMMS_HOST_X86_64 -#define SPINLOCK_PAUSE() asm( "pause" ) -#else -#define SPINLOCK_PAUSE() -#endif -#endif - -#define WAIT_FOR_JOBS() \ - m_workers[m_numWorkers]->processJobQueue(); \ - while( MixerWorkerThread::s_jobQueue.itemsDone < \ - MixerWorkerThread::s_jobQueue.queueSize ) \ - { \ - SPINLOCK_PAUSE(); \ - } \ - - mixer::mixer() : @@ -347,7 +161,7 @@ mixer::~mixer() { m_workers[w]->quit(); } - START_JOBS(); + MixerWorkerThread::startJobs(); for( int w = 0; w < m_numWorkers; ++w ) { m_workers[w]->wait( 500 ); @@ -565,11 +379,8 @@ sampleFrameA * mixer::renderNextBuffer() // STAGE 1: run and render all play handles - FILL_JOB_QUEUE(PlayHandleList,m_playHandles, - MixerWorkerThread::PlayHandle, - !( *it )->done()); - START_JOBS(); - WAIT_FOR_JOBS(); + MixerWorkerThread::fillJobQueue( m_playHandles ); + MixerWorkerThread::startAndWaitForJobs(); // removed all play handles which are done for( PlayHandleList::Iterator it = m_playHandles.begin(); @@ -594,31 +405,12 @@ sampleFrameA * mixer::renderNextBuffer() // STAGE 2: process effects of all instrument- and sampletracks - FILL_JOB_QUEUE(QVector,m_audioPorts, - MixerWorkerThread::AudioPortEffects,1); - START_JOBS(); - WAIT_FOR_JOBS(); + MixerWorkerThread::fillJobQueue >( m_audioPorts ); + MixerWorkerThread::startAndWaitForJobs(); - - // STAGE 3: process effects in FX mixer - /*FILL_JOB_QUEUE_PARAM(QVector,__fx_channel_jobs, - MixerWorkerThread::EffectChannel,1); - START_JOBS(); - WAIT_FOR_JOBS();*/ - - - // STAGE 4: do master mix in FX mixer + // STAGE 3: do master mix in FX mixer fxm->masterMix( m_writeBuf ); - WAIT_FOR_JOBS(); - - // clear all channel buffers - for( int i = 0; i < fxm->numChannels(); ++i) - { - engine::getMixer()->clearAudioBuffer( fxm->effectChannel(i)->m_buffer, - engine::getMixer()->framesPerPeriod() ); - } - unlock();