added shmFifo class which replaces communication via pipes and is portable as well

git-svn-id: https://lmms.svn.sf.net/svnroot/lmms/trunk/lmms@1524 0778d3d1-df1d-0410-868b-ea421aaaa00d
This commit is contained in:
Tobias Doerffel
2008-08-31 21:07:58 +00:00
parent 9b47d27f56
commit c0c241c9b5
5 changed files with 384 additions and 222 deletions

View File

@@ -30,24 +30,22 @@
#include "mixer.h"
#include "midi.h"
#ifdef LMMS_HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <vector>
#include <string>
#include <cassert>
#ifdef LMMS_BUILD_WIN32
#ifdef LMMS_HOST_X86_64
#define USE_NATIVE_SHM_API
#else
#if QT_VERSION >= 0x040400
#include <QtCore/QSharedMemory>
#else
#define USE_NATIVE_SHM_API
#endif
#error win32-build requires at least Qt 4.4.0
#endif
#ifdef USE_NATIVE_SHM_API
#else
#define USE_NATIVE_SHMEM
#ifdef LMMS_HAVE_SYS_IPC_H
#include <sys/ipc.h>
#endif
@@ -55,70 +53,307 @@
#ifdef LMMS_HAVE_SYS_SHM_H
#include <sys/shm.h>
#endif
#endif
#ifdef LMMS_HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#ifdef BUILD_REMOTE_PLUGIN_CLIENT
#define COMPILE_REMOTE_PLUGIN_BASE
#else
#include <QtCore/QProcess>
#endif
inline int32_t readInt( int _fd )
// 4000 should be enough - this way we only need to allocate one page
const int SHM_FIFO_SIZE = 4000;
// implements a FIFO inside a shared memory segment
class shmFifo
{
int32_t i;
if( read( _fd, &i, sizeof( i ) ) == sizeof( i ) )
// need this union to handle different sizes of sem_t on 32 bit
// and 64 bit platforms
union sem32_t
{
sem_t sem;
char fill[32];
} ;
struct shmData
{
sem32_t dataSem; // semaphore for locking this
// FIFO management data
sem32_t messageSem; // semaphore for incoming messages
volatile int32_t startPtr; // current start of FIFO in memory
volatile int32_t endPtr; // current end of FIFO in memory
char data[SHM_FIFO_SIZE]; // actual data
} ;
public:
// constructor for master-side
shmFifo() :
m_master( true ),
m_shmKey( 0 ),
#ifdef USE_NATIVE_SHMEM
m_shmID( -1 ),
#else
m_shmObj(),
#endif
m_data( NULL ),
m_dataSem( NULL ),
m_messageSem( NULL ),
m_lockDepth( 0 )
{
#ifdef USE_NATIVE_SHMEM
while( ( m_shmID = shmget( ++m_shmKey, sizeof( shmData ),
IPC_CREAT | IPC_EXCL | 0600 ) ) == -1 )
{
}
m_data = (shmData *) shmat( m_shmID, 0, 0 );
#else
do
{
m_shmObj.setKey( QString( "%1" ).arg( ++m_shmKey ) );
m_shmObj.create( sizeof( shmData ) );
} while( m_shmObj.error() != QSharedMemory::NoError );
m_data = (shmData *) m_shmObj.data();
#endif
assert( m_data != NULL );
m_dataSem = &m_data->dataSem.sem;
m_messageSem = &m_data->messageSem.sem;
m_data->startPtr = m_data->endPtr = 0;
if( sem_init( m_dataSem, 1, 1 ) )
{
printf( "could not initialize m_dataSem\n" );
}
if( sem_init( m_messageSem, 1, 0 ) )
{
printf( "could not initialize m_messageSem\n" );
}
}
// constructor for remote-/client-side - use _shm_key for making up
// the connection to master
shmFifo( key_t _shm_key ) :
m_master( false ),
m_shmKey( 0 ),
#ifdef USE_NATIVE_SHMEM
m_shmID( shmget( _shm_key, 0, 0 ) ),
#else
m_shmObj( QString::number( _shm_key ) ),
#endif
m_data( NULL ),
m_dataSem( NULL ),
m_messageSem( NULL ),
m_lockDepth( 0 )
{
#ifdef USE_NATIVE_SHMEM
if( m_shmID != -1 )
{
m_data = (shmData *) shmat( m_shmID, 0, 0 );
}
#else
if( m_shmObj.attach() )
{
m_data = (shmData *) m_shmObj.data();
}
#endif
assert( m_data != NULL );
m_dataSem = &m_data->dataSem.sem;
m_messageSem = &m_data->messageSem.sem;
}
~shmFifo()
{
#ifdef USE_NATIVE_SHMEM
shmdt( m_data );
#endif
// master?
if( m_master )
{
#ifdef USE_NATIVE_SHMEM
shmctl( m_shmID, IPC_RMID, NULL );
#endif
sem_destroy( m_dataSem );
sem_destroy( m_messageSem );
}
}
// do we act as master (i.e. not as remote-process?)
inline bool isMaster( void ) const
{
return( m_master );
}
// recursive lock
inline void lock( void )
{
if( ++m_lockDepth == 1 )
{
sem_wait( m_dataSem );
}
}
// recursive unlock
inline void unlock( void )
{
if( m_lockDepth > 0 )
{
if( --m_lockDepth == 0 )
{
sem_post( m_dataSem );
}
}
}
// wait until message-semaphore is available
inline void waitForMessage( void )
{
sem_wait( m_messageSem );
}
// increase message-semaphore
inline void messageSent( void )
{
sem_post( m_messageSem );
}
inline int32_t readInt( void )
{
int32_t i;
read( &i, sizeof( i ) );
return( i );
}
return( 0 );
}
inline bool writeInt( const int32_t & _i, int _fd )
{
if( write( _fd, &_i, sizeof( _i ) ) != sizeof( _i ) )
inline void writeInt( const int32_t & _i )
{
return false;
write( &_i, sizeof( _i ) );
}
return true;
}
static inline std::string readString( int _fd )
{
const int len = readInt( _fd );
if( len )
inline std::string readString( void )
{
char * sc = new char[len + 1];
if( read( _fd, sc, len ) == len )
const int len = readInt();
if( len )
{
char * sc = new char[len + 1];
read( sc, len );
sc[len] = 0;
std::string s( sc );
delete[] sc;
return( s );
}
delete[] sc;
return std::string();
}
return std::string();
}
static inline bool writeString( const std::string & _s, int _fd )
{
const int len = _s.size();
writeInt( len, _fd );
if( write( _fd, _s.c_str(), len ) != len )
inline void writeString( const std::string & _s )
{
return false;
const int len = _s.size();
writeInt( len );
write( _s.c_str(), len );
}
return true;
}
inline bool messagesLeft( void )
{
int v;
sem_getvalue( m_messageSem, &v );
return( v > 0 );
}
inline int shmKey( void ) const
{
return( m_shmKey );
}
private:
static inline int fastMemCpy( void * _dest, const void * _src,
const int _len )
{
// calling memcpy() for just an integer is obsolete overhead
if( _len == 4 )
{
*( (int32_t *) _dest ) = *( (int32_t *) _src );
}
else
{
memcpy( _dest, _src, _len );
}
}
void read( void * _buf, int _len )
{
lock();
while( _len > m_data->endPtr - m_data->startPtr )
{
unlock();
#ifndef LMMS_BUILD_WIN32
usleep( 5 );
#endif
lock();
}
fastMemCpy( _buf, m_data->data + m_data->startPtr, _len );
m_data->startPtr += _len;
// nothing left?
if( m_data->startPtr == m_data->endPtr )
{
// then reset to 0
m_data->startPtr = m_data->endPtr = 0;
}
unlock();
}
void write( const void * _buf, int _len )
{
lock();
while( _len > SHM_FIFO_SIZE - m_data->endPtr )
{
// if no space is left, try to move data to front
if( m_data->startPtr > 0 )
{
memmove( m_data->data,
m_data->data + m_data->startPtr,
m_data->endPtr - m_data->startPtr );
m_data->endPtr = m_data->endPtr -
m_data->startPtr;
m_data->startPtr = 0;
}
unlock();
#ifndef LMMS_BUILD_WIN32
usleep( 5 );
#endif
lock();
}
fastMemCpy( m_data->data + m_data->endPtr, _buf, _len );
m_data->endPtr += _len;
unlock();
}
bool m_master;
#ifdef USE_NATIVE_SHMEM
key_t m_shmKey;
int m_shmID;
#else
int m_shmKey;
QSharedMemory m_shmObj;
#endif
size_t m_shmSize;
shmData * m_data;
sem_t * m_dataSem;
sem_t * m_messageSem;
int m_lockDepth;
} ;
enum RemoteMessageIDs
@@ -127,7 +362,6 @@ enum RemoteMessageIDs
IdGeneralFailure,
IdInitDone,
IdClosePlugin,
IdDebugMessage,
IdSampleRateInformation,
IdBufferSizeInformation,
IdMidiEvent,
@@ -193,13 +427,18 @@ public:
std::vector<std::string> data;
} ;
remotePluginBase( void );
remotePluginBase( shmFifo * _in, shmFifo * _out );
virtual ~remotePluginBase();
void sendMessage( const message & _m );
message receiveMessage( void );
bool messagesLeft( void ) const;
inline bool messagesLeft( void )
{
return( m_in->messagesLeft() );
}
message waitForMessage( const message & _m,
bool _busy_waiting = FALSE );
@@ -222,19 +461,20 @@ public:
protected:
void setInFD( int _fd )
const shmFifo * in( void ) const
{
m_inFD = _fd;
return( m_in );
}
void setOutFD( int _fd )
const shmFifo * out( void ) const
{
m_outFD = _fd;
return( m_out );
}
private:
int m_inFD;
int m_outFD;
shmFifo * m_in;
shmFifo * m_out;
} ;
@@ -287,17 +527,16 @@ protected:
private:
void resizeSharedMemory( void );
void resizeSharedProcessingMemory( void );
bool m_initialized;
bool m_failed;
int m_pluginPID;
int m_pipes[2][2];
QProcess m_process;
QMutex m_commMutex;
#ifdef USE_NATIVE_SHM_API
#ifdef USE_NATIVE_SHMEM
int m_shmID;
#else
QSharedMemory m_shmObj;
@@ -318,7 +557,7 @@ private:
class remotePluginClient : public remotePluginBase
{
public:
remotePluginClient( void );
remotePluginClient( key_t _shm_in, key_t _shm_out );
virtual ~remotePluginClient();
virtual bool processMessage( const message & _m );
@@ -349,20 +588,6 @@ public:
return( m_bufferSize );
}
void debugMessage( const char * _fmt, ... )
{
va_list ap;
char buffer[512];
va_start( ap, _fmt );
buffer[0] = 0;
vsnprintf( buffer, sizeof( buffer ), _fmt, ap );
message m( IdDebugMessage );
m.data.push_back( std::string( buffer ) );
sendMessage( m );
va_end( ap );
}
void setInputCount( int _i )
{
m_inputCount = _i;
@@ -381,10 +606,10 @@ public:
private:
void setShmKey( int _key, int _size );
void setShmKey( key_t _key, int _size );
void doProcessing( void );
#ifndef USE_NATIVE_SHM_API
#ifndef USE_NATIVE_SHMEM
QSharedMemory m_shmObj;
#endif
float * m_shm;
@@ -409,9 +634,9 @@ private:
#endif
remotePluginBase::remotePluginBase( void ) :
m_inFD( -1 ),
m_outFD( -1 )
remotePluginBase::remotePluginBase( shmFifo * _in, shmFifo * _out ) :
m_in( _in ),
m_out( _out )
{
}
@@ -420,6 +645,8 @@ remotePluginBase::remotePluginBase( void ) :
remotePluginBase::~remotePluginBase()
{
delete m_in;
delete m_out;
}
@@ -427,12 +654,15 @@ remotePluginBase::~remotePluginBase()
void remotePluginBase::sendMessage( const message & _m )
{
writeInt( _m.id, m_outFD );
writeInt( _m.data.size(), m_outFD );
m_out->lock();
m_out->writeInt( _m.id );
m_out->writeInt( _m.data.size() );
for( int i = 0; i < _m.data.size(); ++i )
{
writeString( _m.data[i], m_outFD );
m_out->writeString( _m.data[i] );
}
m_out->unlock();
m_out->messageSent();
}
@@ -440,66 +670,46 @@ void remotePluginBase::sendMessage( const message & _m )
remotePluginBase::message remotePluginBase::receiveMessage( void )
{
m_in->waitForMessage();
m_in->lock();
message m;
m.id = readInt( m_inFD );
const int s = readInt( m_inFD );
m.id = m_in->readInt();
const int s = m_in->readInt();
for( int i = 0; i < s; ++i )
{
m.data.push_back( readString( m_inFD ) );
m.data.push_back( m_in->readString() );
}
m_in->unlock();
return( m );
}
inline bool remotePluginBase::messagesLeft( void ) const
{
fd_set rfds;
FD_ZERO( &rfds );
FD_SET( m_inFD, &rfds );
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1; // can we use 0 here?
return( select( m_inFD + 1, &rfds, NULL, NULL, &tv ) > 0 );
}
remotePluginBase::message remotePluginBase::waitForMessage(
const message & _wm,
bool _busy_waiting )
{
while( 1 )
{
if( messagesLeft() )
{
message m;
m = receiveMessage();
processMessage( m );
if( m.id == _wm.id )
{
return( m );
}
else if( m.id == IdGeneralFailure )
{
return( m );
}
}
#ifndef BUILD_REMOTE_PLUGIN_CLIENT
if( _busy_waiting )
if( _busy_waiting && !messagesLeft() )
{
QCoreApplication::processEvents(
QEventLoop::AllEvents, 50 );
continue;
}
else
{
usleep( 10 );
}
#else
usleep( 10 );
#endif
message m = receiveMessage();
processMessage( m );
if( m.id == _wm.id )
{
return( m );
}
else if( m.id == IdGeneralFailure )
{
return( m );
}
}
}
@@ -513,9 +723,10 @@ remotePluginBase::message remotePluginBase::waitForMessage(
#ifdef BUILD_REMOTE_PLUGIN_CLIENT
remotePluginClient::remotePluginClient( void ) :
remotePluginBase(),
#ifndef USE_NATIVE_SHM_API
remotePluginClient::remotePluginClient( key_t _shm_in, key_t _shm_out ) :
remotePluginBase( new shmFifo( _shm_in ),
new shmFifo( _shm_out ) ),
#ifndef USE_NATIVE_SHMEM
m_shmObj(),
#endif
m_shm( NULL ),
@@ -523,8 +734,6 @@ remotePluginClient::remotePluginClient( void ) :
m_outputCount( DEFAULT_CHANNELS ),
m_bufferSize( DEFAULT_BUFFER_SIZE )
{
setInFD( 0 );
setOutFD( 1 );
sendMessage( IdSampleRateInformation );
sendMessage( IdBufferSizeInformation );
}
@@ -536,7 +745,7 @@ remotePluginClient::~remotePluginClient()
{
sendMessage( IdClosePlugin );
#ifdef USE_NATIVE_SHM_API
#ifdef USE_NATIVE_SHMEM
shmdt( m_shm );
#endif
}
@@ -600,9 +809,9 @@ bool remotePluginClient::processMessage( const message & _m )
void remotePluginClient::setShmKey( int _key, int _size )
void remotePluginClient::setShmKey( key_t _key, int _size )
{
#ifdef USE_NATIVE_SHM_API
#ifdef USE_NATIVE_SHMEM
if( m_shm != NULL )
{
shmdt( m_shm );
@@ -618,7 +827,7 @@ void remotePluginClient::setShmKey( int _key, int _size )
int shm_id = shmget( _key, _size, 0 );
if( shm_id == -1 )
{
debugMessage( "failed getting shared memory" );
fprintf( stderr, "failed getting shared memory\n" );
}
else
{
@@ -632,7 +841,7 @@ void remotePluginClient::setShmKey( int _key, int _size )
}
else
{
debugMessage( "failed getting shared memory" );
fprintf( stderr, "failed getting shared memory\n" );
}
#endif
}