Mixer: rewrote/reorganized job queueing for worker threads

In Mixer, the old C-macro based code has been replaced by an OOP-like
design. Management of job queue now happens via some static member
methods of MixerWorkerThread. All the moved code still needs to be
splitted into some new files but here's a first dirty version.

All objects that are intended to be processed by MixerWorkerThreads
have to inherit ThreadableJob (name of class is subject of change).

One can add jobs to the job queue even if the queue is already being
processed. This is merely important for multithreading with upcoming
FX sends support.
This commit is contained in:
Tobias Doerffel
2009-10-11 01:36:55 +02:00
parent abfdb6a74d
commit 3fa96a576c
6 changed files with 224 additions and 224 deletions

View File

@@ -438,6 +438,13 @@ void FxMixer::masterMix( sampleFrame * _buf )
m_fxChannels[0]->m_peakLeft *= engine::getMixer()->masterGain();
m_fxChannels[0]->m_peakRight *= engine::getMixer()->masterGain();
// clear all channel buffers
for( int i = 0; i < numChannels(); ++i)
{
engine::getMixer()->clearAudioBuffer( m_fxChannels[i]->m_buffer,
engine::getMixer()->framesPerPeriod() );
}
}

View File

@@ -24,9 +24,10 @@
#include "AudioPort.h"
#include "AudioDevice.h"
#include "EffectChain.h"
#include "engine.h"
#include "Cpu.h"
#include "EffectChain.h"
#include "FxMixer.h"
#include "engine.h"
AudioPort::AudioPort( const QString & _name, bool _has_effect_chain ) :
@@ -123,3 +124,15 @@ bool AudioPort::processEffects()
}
void AudioPort::doProcessing( sampleFrame * )
{
const bool me = processEffects();
if( me || m_bufferUsage != NoUsage )
{
engine::fxMixer()->mixToChannel( firstBuffer(), nextFxChannel() );
nextPeriod();
}
}

View File

@@ -61,197 +61,11 @@
#endif
class MixerWorkerThread : public QThread
{
public:
enum JobTypes
{
InvalidJob,
PlayHandle,
AudioPortEffects,
EffectChannel,
NumJobTypes
} ;
struct JobQueueItem
{
JobQueueItem() :
type( InvalidJob ),
job( NULL ),
param( 0 ),
done( false )
{
}
JobQueueItem( JobTypes _type, void * _job, int _param = 0 ) :
type( _type ),
job( _job ),
param( _param ),
done( false )
{
}
JobTypes type;
void * job;
int param;
QAtomicInt done;
} ;
struct JobQueue
{
#define JOB_QUEUE_SIZE 1024
JobQueue() :
queueSize( 0 )
{
}
JobQueueItem items[JOB_QUEUE_SIZE];
int queueSize;
QAtomicInt itemsDone;
} ;
static JobQueue s_jobQueue;
MixerWorkerThread( int _worker_num, mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ),
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
{
}
virtual ~MixerWorkerThread()
{
CPU::freeFrames( m_workingBuf );
}
virtual void quit()
{
m_quit = true;
}
void processJobQueue();
private:
virtual void run()
{
QMutex m;
while( m_quit == false )
{
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
m.unlock();
}
}
sampleFrame * m_workingBuf;
int m_workerNum;
volatile bool m_quit;
mixer * m_mixer;
QWaitCondition * m_queueReadyWaitCond;
} ;
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
void MixerWorkerThread::processJobQueue()
{
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
JobQueueItem * it = &s_jobQueue.items[i];
if( it->done.fetchAndStoreOrdered( 1 ) == 0 )
{
switch( it->type )
{
case PlayHandle:
( (playHandle *) it->job )->
play( m_workingBuf );
break;
case AudioPortEffects:
{
AudioPort * a = (AudioPort *) it->job;
const bool me = a->processEffects();
if( me || a->m_bufferUsage != AudioPort::NoUsage )
{
engine::fxMixer()->mixToChannel( a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
case EffectChannel:
engine::fxMixer()->processChannel( (fx_ch_t) it->param );
break;
default:
break;
}
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
#define FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.queueSize = 0; \
MixerWorkerThread::s_jobQueue.itemsDone = 0; \
for( _vec_type::Iterator it = _vec.begin(); \
it != _vec.end(); ++it ) \
{ \
if( _condition ) \
{
#define FILL_JOB_QUEUE_END() \
++MixerWorkerThread::s_jobQueue.queueSize; \
} \
}
#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
(void *) *it ); \
FILL_JOB_QUEUE_END()
#define FILL_JOB_QUEUE_PARAM(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
NULL, *it ); \
FILL_JOB_QUEUE_END()
#define START_JOBS() \
m_queueReadyWaitCond.wakeAll();
// define a pause instruction for spinlock-loop - merely useful on
// HyperThreading systems with just one physical core (e.g. Intel Atom)
#ifdef LMMS_HOST_X86
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#define SPINLOCK_PAUSE()
#endif
#endif
#define WAIT_FOR_JOBS() \
m_workers[m_numWorkers]->processJobQueue(); \
while( MixerWorkerThread::s_jobQueue.itemsDone < \
MixerWorkerThread::s_jobQueue.queueSize ) \
{ \
SPINLOCK_PAUSE(); \
} \
mixer::mixer() :
@@ -347,7 +161,7 @@ mixer::~mixer()
{
m_workers[w]->quit();
}
START_JOBS();
MixerWorkerThread::startJobs();
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w]->wait( 500 );
@@ -565,11 +379,8 @@ sampleFrameA * mixer::renderNextBuffer()
// STAGE 1: run and render all play handles
FILL_JOB_QUEUE(PlayHandleList,m_playHandles,
MixerWorkerThread::PlayHandle,
!( *it )->done());
START_JOBS();
WAIT_FOR_JOBS();
MixerWorkerThread::fillJobQueue<PlayHandleList>( m_playHandles );
MixerWorkerThread::startAndWaitForJobs();
// removed all play handles which are done
for( PlayHandleList::Iterator it = m_playHandles.begin();
@@ -594,31 +405,12 @@ sampleFrameA * mixer::renderNextBuffer()
// STAGE 2: process effects of all instrument- and sampletracks
FILL_JOB_QUEUE(QVector<AudioPort*>,m_audioPorts,
MixerWorkerThread::AudioPortEffects,1);
START_JOBS();
WAIT_FOR_JOBS();
MixerWorkerThread::fillJobQueue<QVector<AudioPort *> >( m_audioPorts );
MixerWorkerThread::startAndWaitForJobs();
// STAGE 3: process effects in FX mixer
/*FILL_JOB_QUEUE_PARAM(QVector<fx_ch_t>,__fx_channel_jobs,
MixerWorkerThread::EffectChannel,1);
START_JOBS();
WAIT_FOR_JOBS();*/
// STAGE 4: do master mix in FX mixer
// STAGE 3: do master mix in FX mixer
fxm->masterMix( m_writeBuf );
WAIT_FOR_JOBS();
// clear all channel buffers
for( int i = 0; i < fxm->numChannels(); ++i)
{
engine::getMixer()->clearAudioBuffer( fxm->effectChannel(i)->m_buffer,
engine::getMixer()->framesPerPeriod() );
}
unlock();