reworked mixer-threads (synchronization, realization of jobqueue etc.) which results in a much better performance and stability

git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/trunk/lmms@1998 0778d3d1-df1d-0410-868b-ea421aaaa00d
This commit is contained in:
Tobias Doerffel
2009-02-05 13:14:12 +00:00
parent cd2bb63676
commit c0f48d6196
3 changed files with 132 additions and 127 deletions

View File

@@ -1,7 +1,7 @@
/*
* audio_port.h - base-class for objects providing sound at a port
*
* Copyright (c) 2005-2008 Tobias Doerffel <tobydox/at/users.sourceforge.net>
* Copyright (c) 2005-2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
@@ -136,7 +136,7 @@ private:
friend class mixer;
friend class mixerWorkerThread;
friend class MixerWorkerThread;
} ;

View File

@@ -40,9 +40,9 @@
#include <QtCore/QMutex>
#include <QtCore/QSemaphore>
#include <QtCore/QThread>
#include <QtCore/QVector>
#include <QtCore/QWaitCondition>
#include "lmms_basics.h"
@@ -66,7 +66,7 @@ const Octaves BaseOctave = DefaultOctave;
#include "play_handle.h"
class mixerWorkerThread;
class MixerWorkerThread;
class EXPORT mixer : public QObject
@@ -431,10 +431,9 @@ private:
bool m_newBuffer[SURROUND_CHANNELS];
int m_cpuLoad;
QVector<mixerWorkerThread *> m_workers;
QVector<MixerWorkerThread *> m_workers;
int m_numWorkers;
QSemaphore m_queueReadySem;
QSemaphore m_workersDoneSem;
QWaitCondition m_queueReadyWaitCond;
playHandleVector m_playHandles;
@@ -462,7 +461,7 @@ private:
friend class engine;
friend class mixerWorkerThread;
friend class MixerWorkerThread;
} ;

View File

@@ -1,7 +1,7 @@
/*
* mixer.cpp - audio-device-independent mixer for LMMS
*
* Copyright (c) 2004-2008 Tobias Doerffel <tobydox/at/users.sourceforge.net>
* Copyright (c) 2004-2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
@@ -71,7 +71,7 @@ static QVector<fx_ch_t> __fx_channel_jobs( NumFxChannels );
class mixerWorkerThread : public QThread
class MixerWorkerThread : public QThread
{
public:
enum JobTypes
@@ -83,61 +83,57 @@ public:
NumJobTypes
} ;
struct jobQueueItem
struct JobQueueItem
{
jobQueueItem() :
JobQueueItem() :
type( InvalidJob ),
job( NULL ),
param( 0 ),
done( false )
{
}
jobQueueItem( JobTypes _type, void * _job ) :
JobQueueItem( JobTypes _type, void * _job, int _param = 0 ) :
type( _type ),
job( _job ),
param( _param ),
done( false )
{
}
JobTypes type;
void * job;
int param;
union
{
playHandle * playHandleJob;
audioPort * audioPortJob;
int effectChannelJob;
volatile void * job;
};
#if QT_VERSION >= 0x040400
QAtomicInt done;
#else
volatile bool done;
#endif
} ;
typedef QVector<jobQueueItem> jobQueueItems;
struct jobQueue
struct JobQueue
{
jobQueueItems items;
#if QT_VERSION < 0x040400
QMutex lock;
#endif
#define JOB_QUEUE_SIZE 1024
JobQueue() :
queueSize( 0 )
{
}
JobQueueItem items[JOB_QUEUE_SIZE];
int queueSize;
QAtomicInt itemsDone;
} ;
static jobQueue s_jobQueue;
static JobQueue s_jobQueue;
mixerWorkerThread( int _worker_num, mixer * _mixer ) :
MixerWorkerThread( int _worker_num, mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( alignedAllocFrames( _mixer->framesPerPeriod() ) ),
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadySem( &m_mixer->m_queueReadySem ),
m_workersDoneSem( &m_mixer->m_workersDoneSem )
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
{
}
virtual ~mixerWorkerThread()
virtual ~MixerWorkerThread()
{
alignedFreeFrames( m_workingBuf );
}
@@ -147,60 +143,7 @@ public:
m_quit = true;
}
void processJobQueue( void )
{
jobQueueItems::iterator end_it = s_jobQueue.items.end();
for( jobQueueItems::iterator it =
s_jobQueue.items.begin();
it != end_it; ++it )
{
#if QT_VERSION >= 0x040400
if( it->done.fetchAndStoreOrdered( 1 ) == 0 )
{
#else
s_jobQueue.lock.lock();
if( !it->done )
{
it->done = true;
s_jobQueue.lock.unlock();
#endif
switch( it->type )
{
case PlayHandle:
it->playHandleJob->play( m_workingBuf );
break;
case AudioPortEffects:
{
audioPort * a = it->audioPortJob;
const bool me = a->processEffects();
if( me || a->m_bufferUsage != audioPort::NoUsage )
{
engine::getFxMixer()->mixToChannel( a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
case EffectChannel:
engine::getFxMixer()->processChannel(
(fx_ch_t) it->effectChannelJob );
break;
default:
/*fprintf( stderr, "invalid job item type %d at %ld in jobqueue(%ld:%ld)\n",
(int) it->type, (long int) it,
(long int) s_jobQueue.items.begin(),
(long int) end_it );*/
break;
}
}
#if QT_VERSION < 0x040400
else
{
s_jobQueue.lock.unlock();
}
#endif
}
}
void processJobQueue( void );
private:
@@ -216,12 +159,13 @@ private:
#endif
#endif
#endif
QMutex m;
while( m_quit == false )
{
m_queueReadySem->acquire();
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
m_workersDoneSem->release();
m.unlock();
}
}
@@ -229,34 +173,103 @@ private:
int m_workerNum;
volatile bool m_quit;
mixer * m_mixer;
QSemaphore * m_queueReadySem;
QSemaphore * m_workersDoneSem;
QWaitCondition * m_queueReadyWaitCond;
} ;
mixerWorkerThread::jobQueue mixerWorkerThread::s_jobQueue;
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \
mixerWorkerThread::s_jobQueue.items.clear(); \
void MixerWorkerThread::processJobQueue( void )
{
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
JobQueueItem * it = &s_jobQueue.items[i];
if( it->done.fetchAndStoreOrdered( 1 ) == 0 )
{
switch( it->type )
{
case PlayHandle:
( (playHandle *) it->job )->
play( m_workingBuf );
break;
case AudioPortEffects:
{
audioPort * a = (audioPort *) it->job;
const bool me = a->processEffects();
if( me || a->m_bufferUsage != audioPort::NoUsage )
{
engine::getFxMixer()->mixToChannel( a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
case EffectChannel:
engine::getFxMixer()->processChannel( (fx_ch_t) it->param );
break;
default:
break;
}
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
#define FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.queueSize = 0; \
MixerWorkerThread::s_jobQueue.itemsDone = 0; \
for( _vec_type::iterator it = _vec.begin(); \
it != _vec.end(); ++it ) \
it != _vec.end(); ++it ) \
{ \
if( _condition ) \
{ \
mixerWorkerThread::s_jobQueue.items. \
push_back( \
mixerWorkerThread::jobQueueItem( _job_type, \
(void *)*it ) );\
{
#define FILL_JOB_QUEUE_END() \
++MixerWorkerThread::s_jobQueue.queueSize; \
} \
}
#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
(void *) *it ); \
FILL_JOB_QUEUE_END()
#define FILL_JOB_QUEUE_PARAM(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
NULL, *it ); \
FILL_JOB_QUEUE_END()
#define START_JOBS() \
m_queueReadySem.release( m_numWorkers ); \
m_queueReadyWaitCond.wakeAll();
// define a pause instruction for spinlock-loop - merely useful on
// HyperThreading systems with just one physical core (e.g. Intel Atom)
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#define SPINLOCK_PAUSE()
#endif
#endif
#define WAIT_FOR_JOBS() \
m_workersDoneSem.acquire( m_numWorkers );
m_workers[m_numWorkers]->processJobQueue(); \
while( MixerWorkerThread::s_jobQueue.itemsDone < \
MixerWorkerThread::s_jobQueue.queueSize ) \
{ \
SPINLOCK_PAUSE(); \
} \
@@ -270,10 +283,8 @@ mixer::mixer( void ) :
m_writeBuf( NULL ),
m_cpuLoad( 0 ),
m_workers(),
m_numWorkers( QThread::idealThreadCount() > 1 ?
QThread::idealThreadCount()-1 : 0 ),
m_queueReadySem( m_numWorkers ),
m_workersDoneSem( m_numWorkers ),
m_numWorkers( QThread::idealThreadCount()-1 ),
m_queueReadyWaitCond(),
m_qualitySettings( qualitySettings::Mode_Draft ),
m_masterGain( 1.0f ),
m_audioDev( NULL ),
@@ -334,14 +345,12 @@ mixer::mixer( void ) :
m_bufferPool.push_back( m_readBuf );
}
m_queueReadySem.acquire( m_numWorkers );
m_workersDoneSem.acquire( m_numWorkers );
for( int i = 0; i < m_numWorkers+1; ++i )
{
mixerWorkerThread * wt = new mixerWorkerThread( i, this );
if( i > 0 )
MixerWorkerThread * wt = new MixerWorkerThread( i, this );
if( i < m_numWorkers )
{
wt->start( QThread::HighestPriority );
wt->start( QThread::TimeCriticalPriority );
}
m_workers.push_back( wt );
}
@@ -358,15 +367,15 @@ mixer::~mixer()
{
// distribute an empty job-queue so that worker-threads
// get out of their processing-loop
mixerWorkerThread::s_jobQueue.items.clear();
MixerWorkerThread::s_jobQueue.queueSize = 0;
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w+1]->quit();
m_workers[w]->quit();
}
START_JOBS();
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w+1]->wait( 500 );
m_workers[w]->wait( 500 );
}
while( m_fifo->available() )
@@ -403,7 +412,7 @@ void mixer::startProcessing( bool _needs_fifo )
if( _needs_fifo )
{
m_fifoWriter = new fifoWriter( this, m_fifo );
m_fifoWriter->start( QThread::TimeCriticalPriority );
m_fifoWriter->start( QThread::HighPriority );
}
else
{
@@ -580,10 +589,9 @@ sampleFrameA * mixer::renderNextBuffer( void )
// STAGE 1: run and render all play handles
FILL_JOB_QUEUE(playHandleVector,m_playHandles,
mixerWorkerThread::PlayHandle,
MixerWorkerThread::PlayHandle,
!( *it )->done());
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();
// removed all play handles which are done
@@ -591,7 +599,7 @@ sampleFrameA * mixer::renderNextBuffer( void )
it != m_playHandles.end(); )
{
if( ( *it )->affinityMatters() &&
( *it )->affinity() != QThread::currentThread() )
( *it )->affinity() != QThread::currentThread() )
{
++it;
continue;
@@ -610,17 +618,15 @@ sampleFrameA * mixer::renderNextBuffer( void )
// STAGE 2: process effects of all instrument- and sampletracks
FILL_JOB_QUEUE(QVector<audioPort*>,m_audioPorts,
mixerWorkerThread::AudioPortEffects,1);
MixerWorkerThread::AudioPortEffects,1);
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();
// STAGE 3: process effects in FX mixer
FILL_JOB_QUEUE(QVector<fx_ch_t>,__fx_channel_jobs,
mixerWorkerThread::EffectChannel,1);
FILL_JOB_QUEUE_PARAM(QVector<fx_ch_t>,__fx_channel_jobs,
MixerWorkerThread::EffectChannel,1);
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();