reworked mixer-threads (synchronization, realization of jobqueue etc.) which results in a much better performance and stability

git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/branches/lmms/stable-0.4@1996 0778d3d1-df1d-0410-868b-ea421aaaa00d
This commit is contained in:
Tobias Doerffel
2009-02-05 09:03:13 +00:00
parent 8b3d3f33a7
commit b3c9bf011b
5 changed files with 241 additions and 129 deletions

View File

@@ -1,3 +1,12 @@
2009-02-04 Tobias Doerffel <tobydox/at/users/dot/sourceforge/dot/net>
* include/atomic_int.h:
* include/audio_port.h:
* include/mixer.h:
* src/core/mixer.cpp:
reworked mixer-threads (synchronization, realization of jobqueue etc.)
which results in a much better performance and stability
2009-01-25 Paul Giblock <drfaygo/at/gmail/dot/com>
* src/tracks/bb_track.cpp:

97
include/atomic_int.h Executable file
View File

@@ -0,0 +1,97 @@
/*
* atomic_int.h - fallback AtomicInt class when Qt is too old
*
* Copyright (c) 2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/
#ifndef _ATOMIC_INT_H
#define _ATOMIC_INT_H
#include <QtCore/QMutex>
#if QT_VERSION > 0x040400
typedef QAtomicInt AtomicInt;
#else
// implement our own (slow) QAtomicInt class when on old Qt
class AtomicInt
{
public:
inline AtomicInt( int _value = 0 ) :
m_value( _value ),
m_lock()
{
}
inline int fetchAndStoreOrdered( int _newVal )
{
m_lock.lock();
const int oldVal = m_value;
m_value = _newVal;
m_lock.unlock();
return oldVal;
}
inline int fetchAndAddOrdered( int _add )
{
m_lock.lock();
const int oldVal = m_value;
m_value += _add;
m_lock.unlock();
return oldVal;
}
inline AtomicInt & operator=( const AtomicInt & _copy )
{
m_lock.lock();
m_value = _copy.m_value;
m_lock.unlock();
return *this;
}
inline AtomicInt & operator=( int _value )
{
m_lock.lock();
m_value = _value;
m_lock.unlock();
return *this;
}
inline operator int() const
{
return m_value;
}
private:
volatile int m_value;
QMutex m_lock;
} ;
#endif
#endif

View File

@@ -1,7 +1,7 @@
/*
* audio_port.h - base-class for objects providing sound at a port
*
* Copyright (c) 2005-2008 Tobias Doerffel <tobydox/at/users.sourceforge.net>
* Copyright (c) 2005-2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
@@ -136,7 +136,7 @@ private:
friend class mixer;
friend class mixerWorkerThread;
friend class MixerWorkerThread;
} ;

View File

@@ -1,7 +1,7 @@
/*
* mixer.h - audio-device-independent mixer for LMMS
*
* Copyright (c) 2004-2008 Tobias Doerffel <tobydox/at/users.sourceforge.net>
* Copyright (c) 2004-2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
@@ -40,9 +40,9 @@
#include <QtCore/QMutex>
#include <QtCore/QSemaphore>
#include <QtCore/QThread>
#include <QtCore/QVector>
#include <QtCore/QWaitCondition>
#include "lmms_basics.h"
@@ -73,7 +73,7 @@ const Octaves BaseOctave = DefaultOctave;
#include "play_handle.h"
class mixerWorkerThread;
class MixerWorkerThread;
class EXPORT mixer : public QObject
@@ -438,10 +438,9 @@ private:
bool m_newBuffer[SURROUND_CHANNELS];
int m_cpuLoad;
QVector<mixerWorkerThread *> m_workers;
QVector<MixerWorkerThread *> m_workers;
int m_numWorkers;
QSemaphore m_queueReadySem;
QSemaphore m_workersDoneSem;
QWaitCondition m_queueReadyWaitCond;
playHandleVector m_playHandles;
@@ -469,7 +468,7 @@ private:
friend class engine;
friend class mixerWorkerThread;
friend class MixerWorkerThread;
} ;

View File

@@ -1,7 +1,7 @@
/*
* mixer.cpp - audio-device-independent mixer for LMMS
*
* Copyright (c) 2004-2008 Tobias Doerffel <tobydox/at/users.sourceforge.net>
* Copyright (c) 2004-2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
@@ -41,6 +41,7 @@
#include "sample_play_handle.h"
#include "piano_roll.h"
#include "micro_timer.h"
#include "atomic_int.h"
#include "audio_device.h"
#include "midi_client.h"
@@ -95,7 +96,7 @@ static void * aligned_malloc( int _bytes )
class mixerWorkerThread : public QThread
class MixerWorkerThread : public QThread
{
public:
enum JobTypes
@@ -107,50 +108,47 @@ public:
NumJobTypes
} ;
struct jobQueueItem
struct JobQueueItem
{
jobQueueItem() :
JobQueueItem() :
type( InvalidJob ),
job( NULL ),
param( 0 ),
done( false )
{
}
jobQueueItem( JobTypes _type, void * _job ) :
JobQueueItem( JobTypes _type, void * _job, int _param = 0 ) :
type( _type ),
job( _job ),
param( _param ),
done( false )
{
}
JobTypes type;
void * job;
int param;
union
{
playHandle * playHandleJob;
audioPort * audioPortJob;
int effectChannelJob;
volatile void * job;
};
#if QT_VERSION >= 0x040400
QAtomicInt done;
#else
volatile bool done;
#endif
AtomicInt done;
} ;
typedef QVector<jobQueueItem> jobQueueItems;
struct jobQueue
struct JobQueue
{
jobQueueItems items;
#if QT_VERSION < 0x040400
QMutex lock;
#endif
#define JOB_QUEUE_SIZE 1024
JobQueue() :
queueSize( 0 )
{
}
JobQueueItem items[JOB_QUEUE_SIZE];
int queueSize;
AtomicInt itemsDone;
} ;
static jobQueue s_jobQueue;
static JobQueue s_jobQueue;
mixerWorkerThread( int _worker_num, mixer * _mixer ) :
MixerWorkerThread( int _worker_num, mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( (sampleFrame *) aligned_malloc(
_mixer->framesPerPeriod() *
@@ -158,12 +156,11 @@ public:
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadySem( &m_mixer->m_queueReadySem ),
m_workersDoneSem( &m_mixer->m_workersDoneSem )
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
{
}
virtual ~mixerWorkerThread()
virtual ~MixerWorkerThread()
{
aligned_free( m_workingBuf );
}
@@ -173,60 +170,7 @@ public:
m_quit = true;
}
void processJobQueue( void )
{
jobQueueItems::iterator end_it = s_jobQueue.items.end();
for( jobQueueItems::iterator it =
s_jobQueue.items.begin();
it != end_it; ++it )
{
#if QT_VERSION >= 0x040400
if( it->done.fetchAndStoreOrdered( 1 ) == 0 )
{
#else
s_jobQueue.lock.lock();
if( !it->done )
{
it->done = true;
s_jobQueue.lock.unlock();
#endif
switch( it->type )
{
case PlayHandle:
it->playHandleJob->play( m_workingBuf );
break;
case AudioPortEffects:
{
audioPort * a = it->audioPortJob;
const bool me = a->processEffects();
if( me || a->m_bufferUsage != audioPort::NoUsage )
{
engine::getFxMixer()->mixToChannel( a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
case EffectChannel:
engine::getFxMixer()->processChannel(
(fx_ch_t) it->effectChannelJob );
break;
default:
/*fprintf( stderr, "invalid job item type %d at %ld in jobqueue(%ld:%ld)\n",
(int) it->type, (long int) it,
(long int) s_jobQueue.items.begin(),
(long int) end_it );*/
break;
}
}
#if QT_VERSION < 0x040400
else
{
s_jobQueue.lock.unlock();
}
#endif
}
}
void processJobQueue( void );
private:
@@ -242,12 +186,13 @@ private:
#endif
#endif
#endif
QMutex m;
while( m_quit == false )
{
m_queueReadySem->acquire();
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
m_workersDoneSem->release();
m.unlock();
}
}
@@ -255,34 +200,103 @@ private:
int m_workerNum;
volatile bool m_quit;
mixer * m_mixer;
QSemaphore * m_queueReadySem;
QSemaphore * m_workersDoneSem;
QWaitCondition * m_queueReadyWaitCond;
} ;
mixerWorkerThread::jobQueue mixerWorkerThread::s_jobQueue;
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \
mixerWorkerThread::s_jobQueue.items.clear(); \
void MixerWorkerThread::processJobQueue( void )
{
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
JobQueueItem * it = &s_jobQueue.items[i];
if( it->done.fetchAndStoreOrdered( 1 ) == 0 )
{
switch( it->type )
{
case PlayHandle:
( (playHandle *) it->job )->
play( m_workingBuf );
break;
case AudioPortEffects:
{
audioPort * a = (audioPort *) it->job;
const bool me = a->processEffects();
if( me || a->m_bufferUsage != audioPort::NoUsage )
{
engine::getFxMixer()->mixToChannel( a->firstBuffer(),
a->nextFxChannel() );
a->nextPeriod();
}
}
break;
case EffectChannel:
engine::getFxMixer()->processChannel( (fx_ch_t) it->param );
break;
default:
break;
}
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
#define FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.queueSize = 0; \
MixerWorkerThread::s_jobQueue.itemsDone = 0; \
for( _vec_type::iterator it = _vec.begin(); \
it != _vec.end(); ++it ) \
it != _vec.end(); ++it ) \
{ \
if( _condition ) \
{ \
mixerWorkerThread::s_jobQueue.items. \
push_back( \
mixerWorkerThread::jobQueueItem( _job_type, \
(void *)*it ) );\
{
#define FILL_JOB_QUEUE_END() \
++MixerWorkerThread::s_jobQueue.queueSize; \
} \
}
#define FILL_JOB_QUEUE(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
(void *) *it ); \
FILL_JOB_QUEUE_END()
#define FILL_JOB_QUEUE_PARAM(_vec_type,_vec,_job_type,_condition) \
FILL_JOB_QUEUE_BEGIN(_vec_type,_vec,_condition) \
MixerWorkerThread::s_jobQueue.items \
[MixerWorkerThread::s_jobQueue.queueSize] = \
MixerWorkerThread::JobQueueItem( _job_type, \
NULL, *it ); \
FILL_JOB_QUEUE_END()
#define START_JOBS() \
m_queueReadySem.release( m_numWorkers ); \
m_queueReadyWaitCond.wakeAll();
// define a pause instruction for spinlock-loop - merely useful on
// HyperThreading systems with just one physical core (e.g. Intel Atom)
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#define SPINLOCK_PAUSE()
#endif
#endif
#define WAIT_FOR_JOBS() \
m_workersDoneSem.acquire( m_numWorkers );
m_workers[m_numWorkers]->processJobQueue(); \
while( MixerWorkerThread::s_jobQueue.itemsDone < \
MixerWorkerThread::s_jobQueue.queueSize ) \
{ \
SPINLOCK_PAUSE(); \
} \
@@ -296,10 +310,8 @@ mixer::mixer( void ) :
m_writeBuf( NULL ),
m_cpuLoad( 0 ),
m_workers(),
m_numWorkers( QThread::idealThreadCount() > 1 ?
QThread::idealThreadCount()-1 : 0 ),
m_queueReadySem( m_numWorkers ),
m_workersDoneSem( m_numWorkers ),
m_numWorkers( QThread::idealThreadCount()-1 ),
m_queueReadyWaitCond(),
m_qualitySettings( qualitySettings::Mode_Draft ),
m_masterGain( 1.0f ),
m_audioDev( NULL ),
@@ -363,14 +375,12 @@ mixer::mixer( void ) :
m_bufferPool.push_back( m_readBuf );
}
m_queueReadySem.acquire( m_numWorkers );
m_workersDoneSem.acquire( m_numWorkers );
for( int i = 0; i < m_numWorkers+1; ++i )
{
mixerWorkerThread * wt = new mixerWorkerThread( i, this );
if( i > 0 )
MixerWorkerThread * wt = new MixerWorkerThread( i, this );
if( i < m_numWorkers )
{
wt->start( QThread::HighestPriority );
wt->start( QThread::TimeCriticalPriority );
}
m_workers.push_back( wt );
}
@@ -387,15 +397,15 @@ mixer::~mixer()
{
// distribute an empty job-queue so that worker-threads
// get out of their processing-loop
mixerWorkerThread::s_jobQueue.items.clear();
MixerWorkerThread::s_jobQueue.queueSize = 0;
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w+1]->quit();
m_workers[w]->quit();
}
START_JOBS();
for( int w = 0; w < m_numWorkers; ++w )
{
m_workers[w+1]->wait( 500 );
m_workers[w]->wait( 500 );
}
while( m_fifo->available() )
@@ -432,7 +442,7 @@ void mixer::startProcessing( bool _needs_fifo )
if( _needs_fifo )
{
m_fifoWriter = new fifoWriter( this, m_fifo );
m_fifoWriter->start( QThread::TimeCriticalPriority );
m_fifoWriter->start( QThread::HighPriority );
}
else
{
@@ -609,10 +619,9 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
// STAGE 1: run and render all play handles
FILL_JOB_QUEUE(playHandleVector,m_playHandles,
mixerWorkerThread::PlayHandle,
MixerWorkerThread::PlayHandle,
!( *it )->done());
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();
// removed all play handles which are done
@@ -620,7 +629,7 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
it != m_playHandles.end(); )
{
if( ( *it )->affinityMatters() &&
( *it )->affinity() != QThread::currentThread() )
( *it )->affinity() != QThread::currentThread() )
{
++it;
continue;
@@ -639,17 +648,15 @@ const surroundSampleFrame * mixer::renderNextBuffer( void )
// STAGE 2: process effects of all instrument- and sampletracks
FILL_JOB_QUEUE(QVector<audioPort*>,m_audioPorts,
mixerWorkerThread::AudioPortEffects,1);
MixerWorkerThread::AudioPortEffects,1);
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();
// STAGE 3: process effects in FX mixer
FILL_JOB_QUEUE(QVector<fx_ch_t>,__fx_channel_jobs,
mixerWorkerThread::EffectChannel,1);
FILL_JOB_QUEUE_PARAM(QVector<fx_ch_t>,__fx_channel_jobs,
MixerWorkerThread::EffectChannel,1);
START_JOBS();
m_workers[0]->processJobQueue();
WAIT_FOR_JOBS();