Mixer/FxMixer: separated MixerWorkerThread and ThreadableJob into files

Declarations and implementations of MixerWorkerThread and ThreadableJob
have been moved into separate source files.

Furthermore there were some improvements to MixerWorkerThreads.
MixerWorkerThread::processJobQueue() does not return until the job
queue completely has been processed. This way each thread can "help"
to finish processing the queue and does not get back to sleep until
all of the work is done.

Management of the queue is now done via an array of QAtomicPointers.
Items that are non-NULL still need to be processed while NULL-items
were taken from the queue (i.e. in progress or done). Thus we do not
need to deal with ThreadableJob-states within MixerWorkerThread anymore.
This commit is contained in:
Tobias Doerffel
2009-10-14 01:22:31 +02:00
parent c4647a58ac
commit a9d24d34f2
9 changed files with 329 additions and 214 deletions

View File

@@ -27,7 +27,6 @@
#include <QtCore/QString>
#include <QtCore/QMutex>
#include <QtCore/QMutexLocker>
#include "mixer.h"

View File

@@ -76,7 +76,6 @@ public:
virtual ~FxMixer();
void mixToChannel( const sampleFrame * _buf, fx_ch_t _ch );
void processChannel( fx_ch_t _ch, sampleFrame * _buf = NULL );
void prepareMasterMix();
void masterMix( sampleFrame * _buf );

View File

@@ -0,0 +1,97 @@
/*
* MixerWorkerThread.h - declaration of class MixerWorkerThread
*
* Copyright (c) 2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/
#ifndef _MIXER_WORKER_THREAD_H
#define _MIXER_WORKER_THREAD_H
#include <QtCore/QAtomicPointer>
#include <QtCore/QThread>
#include <QtCore/QWaitCondition>
#include "mixer.h"
class MixerWorkerThread : public QThread
{
public:
struct JobQueue
{
#define JOB_QUEUE_SIZE 1024
JobQueue() :
items(),
queueSize( 0 ),
itemsDone( 0 )
{
}
QAtomicPointer<ThreadableJob> items[JOB_QUEUE_SIZE];
QAtomicInt queueSize;
QAtomicInt itemsDone;
} ;
static JobQueue s_jobQueue;
MixerWorkerThread( int _worker_num, mixer * _mixer );
virtual ~MixerWorkerThread();
virtual void quit();
void processJobQueue();
static void resetJobQueue();
template<typename T>
static void fillJobQueue( const T & _vec )
{
resetJobQueue();
for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it )
{
addJob( *it );
}
}
static void addJob( ThreadableJob * _job );
static void startJobs();
static void waitForJobs();
static void startAndWaitForJobs()
{
startJobs();
waitForJobs();
}
private:
virtual void run();
sampleFrame * m_workingBuf;
int m_workerNum;
volatile bool m_quit;
mixer * m_mixer;
QWaitCondition * m_queueReadyWaitCond;
} ;
#endif

84
include/ThreadableJob.h Normal file
View File

@@ -0,0 +1,84 @@
/*
* ThreadableJob.h - declaration of class ThreadableJob
*
* Copyright (c) 2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/
#ifndef _THREADABLE_JOB_H
#define _THREADABLE_JOB_H
#include <QtCore/QAtomicInt>
#include "mixer.h"
class ThreadableJob
{
public:
enum ProcessingState
{
Unstarted,
Queued,
InProgress,
Done
};
ThreadableJob() :
m_state( ThreadableJob::Unstarted )
{
}
inline ProcessingState state() const
{
return static_cast<ProcessingState>( (int) m_state );
}
inline void reset()
{
m_state = Unstarted;
}
inline void queue()
{
m_state = Queued;
}
void process( sampleFrame * _working_buffer )
{
if( m_state.testAndSetOrdered( Queued, InProgress ) )
{
doProcessing( _working_buffer );
m_state = Done;
}
}
virtual bool requiresProcessing() const = 0;
protected:
virtual void doProcessing( sampleFrame * _working_buffer ) = 0;
QAtomicInt m_state;
} ;
#endif

View File

@@ -62,53 +62,9 @@ const Keys BaseKey = Key_A;
const Octaves BaseOctave = DefaultOctave;
class MixerWorkerThread;
// TODO: move to ThreadableJob.h
class ThreadableJob
{
public:
enum ProcessingState
{
Unstarted,
Queued,
InProgress,
Done
};
ThreadableJob() :
m_state( ThreadableJob::Unstarted )
{
}
void reset()
{
m_state = ThreadableJob::Unstarted;
}
bool process( sampleFrame * _working_buffer )
{
if( m_state.testAndSetOrdered( Queued, InProgress ) )
{
doProcessing( _working_buffer );
m_state = Done;
return true;
}
return false;
}
virtual bool requiresProcessing() const = 0;
QAtomicInt m_state;
private:
virtual void doProcessing( sampleFrame * _working_buffer ) = 0;
} ;
#include "ThreadableJob.h"
#include "play_handle.h"
@@ -509,148 +465,4 @@ private:
} ;
// TODO: move to MixerWorkerThread.h / MixerWorkerThread.cpp
#include "Cpu.h"
#include "engine.h"
class MixerWorkerThread : public QThread
{
public:
struct JobQueue
{
#define JOB_QUEUE_SIZE 1024
JobQueue() :
queueSize( 0 ),
itemsDone( 0 )
{
for( int i = 0; i < JOB_QUEUE_SIZE; ++i )
{
items[i] = NULL;
}
}
ThreadableJob * items[JOB_QUEUE_SIZE];
QAtomicInt queueSize;
QAtomicInt itemsDone;
} ;
static JobQueue s_jobQueue;
MixerWorkerThread( int _worker_num, mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ),
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
{
}
virtual ~MixerWorkerThread()
{
CPU::freeFrames( m_workingBuf );
}
virtual void quit()
{
m_quit = true;
}
void processJobQueue()
{
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
// returns true if ThreadableJob was not processed before
if( s_jobQueue.items[i]->process( m_workingBuf ) )
{
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
static void resetJobQueue()
{
s_jobQueue.queueSize = 0;
s_jobQueue.itemsDone = 0;
}
template<typename T>
static void fillJobQueue( const T & _vec )
{
resetJobQueue();
for( typename T::ConstIterator it = _vec.begin(); it != _vec.end(); ++it )
{
addJob( *it );
}
}
static void addJob( ThreadableJob * _job )
{
if( _job->requiresProcessing() )
{
_job->m_state = ThreadableJob::Queued;
s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job;
}
}
// define a pause instruction for spinlock-loop - merely useful on
// HyperThreading systems with just one physical core (e.g. Intel Atom)
#ifdef LMMS_HOST_X86
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#ifdef LMMS_HOST_X86_64
#define SPINLOCK_PAUSE() asm( "pause" )
#else
#define SPINLOCK_PAUSE()
#endif
#endif
static void startJobs()
{
// TODO: this is dirty!
engine::getMixer()->m_queueReadyWaitCond.wakeAll();
}
static void waitForJobs()
{
// TODO: this is dirty!
mixer * m = engine::getMixer();
m->m_workers[m->m_numWorkers]->processJobQueue();
while( s_jobQueue.itemsDone < s_jobQueue.queueSize )
{
startJobs();
}
}
static void startAndWaitForJobs()
{
startJobs();
waitForJobs();
}
private:
virtual void run()
{
QMutex m;
while( m_quit == false )
{
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
m.unlock();
}
}
sampleFrame * m_workingBuf;
int m_workerNum;
volatile bool m_quit;
mixer * m_mixer;
QWaitCondition * m_queueReadyWaitCond;
} ;
#endif

View File

@@ -25,6 +25,7 @@
#ifndef _PLAY_HANDLE_H
#define _PLAY_HANDLE_H
#include "ThreadableJob.h"
#include "mixer.h"
class track;

View File

@@ -22,10 +22,10 @@
*
*/
#include <QtXml/QDomElement>
#include "FxMixer.h"
#include "MixerWorkerThread.h"
#include "Cpu.h"
#include "Effect.h"
#include "song.h"
@@ -60,7 +60,7 @@ FxChannel::~FxChannel()
void FxChannel::doProcessing(sampleFrame * _buf)
void FxChannel::doProcessing( sampleFrame * _buf )
{
FxMixer * fxm = engine::fxMixer();
const fpp_t fpp = engine::getMixer()->framesPerPeriod();
@@ -97,7 +97,7 @@ void FxChannel::doProcessing(sampleFrame * _buf)
const float v = m_volumeModel.value();
m_fxChain.startRunning();
m_stillRunning = m_fxChain.processAudioBuffer( _buf, fpp);
m_stillRunning = m_fxChain.processAudioBuffer( _buf, fpp );
m_peakLeft = engine::getMixer()->peakValueLeft( _buf, fpp ) * v;
m_peakRight = engine::getMixer()->peakValueRight( _buf, fpp ) * v;
}
@@ -113,13 +113,13 @@ void FxChannel::doProcessing(sampleFrame * _buf)
{
// if parent.unstarted and every parent.leaf.done:
FxChannel * parent = fxm->effectChannel(m_sends[i]);
if( parent->m_state == ThreadableJob::Unstarted )
if( parent->state() == ThreadableJob::Unstarted )
{
bool everyLeafDone = true;
for( int j=0; j<parent->m_receives.size(); ++j )
{
if( fxm->effectChannel(parent->m_receives[j])->m_state !=
ThreadableJob::Done )
if( fxm->effectChannel( parent->m_receives[j] )->state() !=
ThreadableJob::Done )
{
everyLeafDone = false;
break;
@@ -435,15 +435,6 @@ void FxMixer::mixToChannel( const sampleFrame * _buf, fx_ch_t _ch )
void FxMixer::processChannel( fx_ch_t _ch, sampleFrame * _buf )
{
m_fxChannels[_ch]->process(_buf);
}
void FxMixer::prepareMasterMix()
{
engine::getMixer()->clearAudioBuffer( m_fxChannels[0]->m_buffer,
@@ -492,7 +483,7 @@ void FxMixer::masterMix( sampleFrame * _buf )
// to be processed.
MixerWorkerThread::resetJobQueue();
addChannelLeaf( 0, _buf );
while( m_fxChannels[0]->m_state != ThreadableJob::Done )
while( m_fxChannels[0]->state() != ThreadableJob::Done )
{
MixerWorkerThread::startAndWaitForJobs();
}

View File

@@ -0,0 +1,138 @@
/*
* MixerWorkerThread.cpp - implementation of MixerWorkerThread
*
* Copyright (c) 2009 Tobias Doerffel <tobydox/at/users.sourceforge.net>
*
* This file is part of Linux MultiMedia Studio - http://lmms.sourceforge.net
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/
#include "MixerWorkerThread.h"
#include "Cpu.h"
#include "engine.h"
#include "mixer.h"
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
MixerWorkerThread::MixerWorkerThread( int _worker_num, mixer * _mixer ) :
QThread( _mixer ),
m_workingBuf( CPU::allocFrames( _mixer->framesPerPeriod() ) ),
m_workerNum( _worker_num ),
m_quit( false ),
m_mixer( _mixer ),
m_queueReadyWaitCond( &m_mixer->m_queueReadyWaitCond )
{
resetJobQueue();
}
MixerWorkerThread::~MixerWorkerThread()
{
CPU::freeFrames( m_workingBuf );
}
void MixerWorkerThread::quit()
{
m_quit = true;
}
void MixerWorkerThread::processJobQueue()
{
while( s_jobQueue.itemsDone != s_jobQueue.queueSize )
{
for( int i = 0; i < s_jobQueue.queueSize; ++i )
{
ThreadableJob * job =
s_jobQueue.items[i].fetchAndStoreOrdered( NULL );
if( job )
{
job->process( m_workingBuf );
s_jobQueue.itemsDone.fetchAndAddOrdered( 1 );
}
}
}
}
void MixerWorkerThread::resetJobQueue()
{
s_jobQueue.queueSize = 0;
s_jobQueue.itemsDone = 0;
}
void MixerWorkerThread::addJob( ThreadableJob * _job )
{
if( _job->requiresProcessing() )
{
// update job state
_job->queue();
// actually queue the job via atomic operations
s_jobQueue.items[s_jobQueue.queueSize.fetchAndAddOrdered(1)] = _job;
}
}
void MixerWorkerThread::startJobs()
{
// TODO: this is dirty!
engine::getMixer()->m_queueReadyWaitCond.wakeAll();
}
void MixerWorkerThread::waitForJobs()
{
// TODO: this is dirty!
mixer * m = engine::getMixer();
m->m_workers[m->m_numWorkers]->processJobQueue();
}
void MixerWorkerThread::run()
{
QMutex m;
while( m_quit == false )
{
m.lock();
m_queueReadyWaitCond->wait( &m );
processJobQueue();
m.unlock();
}
}

View File

@@ -26,6 +26,7 @@
#include "mixer.h"
#include "FxMixer.h"
#include "MixerWorkerThread.h"
#include "play_handle.h"
#include "song.h"
#include "templates.h"
@@ -61,13 +62,6 @@
#endif
MixerWorkerThread::JobQueue MixerWorkerThread::s_jobQueue;
mixer::mixer() :
m_framesPerPeriod( DEFAULT_BUFFER_SIZE ),
m_workingBuf( NULL ),