diff --git a/CMakeLists.txt b/CMakeLists.txt index c630c90ee..f1a1184cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -66,6 +66,7 @@ ENDIF(LMMS_BUILD_WIN32) CHECK_INCLUDE_FILES(stdint.h LMMS_HAVE_STDINT_H) CHECK_INCLUDE_FILES(stdlib.h LMMS_HAVE_STDLIB_H) CHECK_INCLUDE_FILES(pthread.h LMMS_HAVE_PTHREAD_H) +CHECK_INCLUDE_FILES(semaphore.h LMMS_HAVE_SEMAPHORE_H) CHECK_INCLUDE_FILES(unistd.h LMMS_HAVE_UNISTD_H) CHECK_INCLUDE_FILES(sys/types.h LMMS_HAVE_SYS_TYPES_H) CHECK_INCLUDE_FILES(sys/ipc.h LMMS_HAVE_SYS_IPC_H) diff --git a/ChangeLog b/ChangeLog index 87bcb1f87..c86e93116 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2008-08-31 Tobias Doerffel + + * include/remote_plugin.h: + * src/core/remote_plugin.cpp: + * lmmsconfig.h.in: + * CMakeLists.txt: + added shmFifo class which replaces communication via pipes and is + portable as well + 2008-08-29 Tobias Doerffel * plugins/sid/sid_instrument.cpp: diff --git a/include/remote_plugin.h b/include/remote_plugin.h index e678d4f9e..ed162be81 100755 --- a/include/remote_plugin.h +++ b/include/remote_plugin.h @@ -30,24 +30,22 @@ #include "mixer.h" #include "midi.h" -#ifdef LMMS_HAVE_UNISTD_H -#include -#endif - #include #include +#include + +#ifdef LMMS_BUILD_WIN32 -#ifdef LMMS_HOST_X86_64 -#define USE_NATIVE_SHM_API -#else #if QT_VERSION >= 0x040400 #include #else -#define USE_NATIVE_SHM_API -#endif +#error win32-build requires at least Qt 4.4.0 #endif -#ifdef USE_NATIVE_SHM_API +#else + +#define USE_NATIVE_SHMEM + #ifdef LMMS_HAVE_SYS_IPC_H #include #endif @@ -55,70 +53,307 @@ #ifdef LMMS_HAVE_SYS_SHM_H #include #endif + +#endif + + +#ifdef LMMS_HAVE_SEMAPHORE_H +#include #endif #ifdef BUILD_REMOTE_PLUGIN_CLIENT #define COMPILE_REMOTE_PLUGIN_BASE +#else +#include #endif -inline int32_t readInt( int _fd ) +// 4000 should be enough - this way we only need to allocate one page +const int SHM_FIFO_SIZE = 4000; + +// implements a FIFO inside a shared memory segment +class shmFifo { - int32_t i; - if( read( _fd, &i, sizeof( i ) ) == sizeof( i ) ) + // need this union to handle different sizes of sem_t on 32 bit + // and 64 bit platforms + union sem32_t { + sem_t sem; + char fill[32]; + } ; + struct shmData + { + sem32_t dataSem; // semaphore for locking this + // FIFO management data + sem32_t messageSem; // semaphore for incoming messages + volatile int32_t startPtr; // current start of FIFO in memory + volatile int32_t endPtr; // current end of FIFO in memory + char data[SHM_FIFO_SIZE]; // actual data + } ; + +public: + // constructor for master-side + shmFifo() : + m_master( true ), + m_shmKey( 0 ), +#ifdef USE_NATIVE_SHMEM + m_shmID( -1 ), +#else + m_shmObj(), +#endif + m_data( NULL ), + m_dataSem( NULL ), + m_messageSem( NULL ), + m_lockDepth( 0 ) + { +#ifdef USE_NATIVE_SHMEM + while( ( m_shmID = shmget( ++m_shmKey, sizeof( shmData ), + IPC_CREAT | IPC_EXCL | 0600 ) ) == -1 ) + { + } + m_data = (shmData *) shmat( m_shmID, 0, 0 ); +#else + do + { + m_shmObj.setKey( QString( "%1" ).arg( ++m_shmKey ) ); + m_shmObj.create( sizeof( shmData ) ); + } while( m_shmObj.error() != QSharedMemory::NoError ); + + m_data = (shmData *) m_shmObj.data(); +#endif + assert( m_data != NULL ); + m_dataSem = &m_data->dataSem.sem; + m_messageSem = &m_data->messageSem.sem; + m_data->startPtr = m_data->endPtr = 0; + + if( sem_init( m_dataSem, 1, 1 ) ) + { + printf( "could not initialize m_dataSem\n" ); + } + if( sem_init( m_messageSem, 1, 0 ) ) + { + printf( "could not initialize m_messageSem\n" ); + } + + } + + // constructor for remote-/client-side - use _shm_key for making up + // the connection to master + shmFifo( key_t _shm_key ) : + m_master( false ), + m_shmKey( 0 ), +#ifdef USE_NATIVE_SHMEM + m_shmID( shmget( _shm_key, 0, 0 ) ), +#else + m_shmObj( QString::number( _shm_key ) ), +#endif + m_data( NULL ), + m_dataSem( NULL ), + m_messageSem( NULL ), + m_lockDepth( 0 ) + { +#ifdef USE_NATIVE_SHMEM + if( m_shmID != -1 ) + { + m_data = (shmData *) shmat( m_shmID, 0, 0 ); + } +#else + if( m_shmObj.attach() ) + { + m_data = (shmData *) m_shmObj.data(); + } +#endif + assert( m_data != NULL ); + m_dataSem = &m_data->dataSem.sem; + m_messageSem = &m_data->messageSem.sem; + } + + ~shmFifo() + { +#ifdef USE_NATIVE_SHMEM + shmdt( m_data ); +#endif + // master? + if( m_master ) + { +#ifdef USE_NATIVE_SHMEM + shmctl( m_shmID, IPC_RMID, NULL ); +#endif + sem_destroy( m_dataSem ); + sem_destroy( m_messageSem ); + } + } + + // do we act as master (i.e. not as remote-process?) + inline bool isMaster( void ) const + { + return( m_master ); + } + + // recursive lock + inline void lock( void ) + { + if( ++m_lockDepth == 1 ) + { + sem_wait( m_dataSem ); + } + } + + // recursive unlock + inline void unlock( void ) + { + if( m_lockDepth > 0 ) + { + if( --m_lockDepth == 0 ) + { + sem_post( m_dataSem ); + } + } + } + + // wait until message-semaphore is available + inline void waitForMessage( void ) + { + sem_wait( m_messageSem ); + } + + // increase message-semaphore + inline void messageSent( void ) + { + sem_post( m_messageSem ); + } + + + inline int32_t readInt( void ) + { + int32_t i; + read( &i, sizeof( i ) ); return( i ); } - return( 0 ); -} - - - -inline bool writeInt( const int32_t & _i, int _fd ) -{ - if( write( _fd, &_i, sizeof( _i ) ) != sizeof( _i ) ) + inline void writeInt( const int32_t & _i ) { - return false; + write( &_i, sizeof( _i ) ); } - return true; -} - - - -static inline std::string readString( int _fd ) -{ - const int len = readInt( _fd ); - if( len ) + inline std::string readString( void ) { - char * sc = new char[len + 1]; - if( read( _fd, sc, len ) == len ) + const int len = readInt(); + if( len ) { + char * sc = new char[len + 1]; + read( sc, len ); sc[len] = 0; std::string s( sc ); delete[] sc; return( s ); } - delete[] sc; + return std::string(); } - return std::string(); -} - - -static inline bool writeString( const std::string & _s, int _fd ) -{ - const int len = _s.size(); - writeInt( len, _fd ); - if( write( _fd, _s.c_str(), len ) != len ) + inline void writeString( const std::string & _s ) { - return false; + const int len = _s.size(); + writeInt( len ); + write( _s.c_str(), len ); } - return true; -} + + + inline bool messagesLeft( void ) + { + int v; + sem_getvalue( m_messageSem, &v ); + return( v > 0 ); + } + + + inline int shmKey( void ) const + { + return( m_shmKey ); + } + + +private: + static inline int fastMemCpy( void * _dest, const void * _src, + const int _len ) + { + // calling memcpy() for just an integer is obsolete overhead + if( _len == 4 ) + { + *( (int32_t *) _dest ) = *( (int32_t *) _src ); + } + else + { + memcpy( _dest, _src, _len ); + } + } + + void read( void * _buf, int _len ) + { + lock(); + while( _len > m_data->endPtr - m_data->startPtr ) + { + unlock(); +#ifndef LMMS_BUILD_WIN32 + usleep( 5 ); +#endif + lock(); + } + fastMemCpy( _buf, m_data->data + m_data->startPtr, _len ); + m_data->startPtr += _len; + // nothing left? + if( m_data->startPtr == m_data->endPtr ) + { + // then reset to 0 + m_data->startPtr = m_data->endPtr = 0; + } + unlock(); + } + + void write( const void * _buf, int _len ) + { + lock(); + while( _len > SHM_FIFO_SIZE - m_data->endPtr ) + { + // if no space is left, try to move data to front + if( m_data->startPtr > 0 ) + { + memmove( m_data->data, + m_data->data + m_data->startPtr, + m_data->endPtr - m_data->startPtr ); + m_data->endPtr = m_data->endPtr - + m_data->startPtr; + m_data->startPtr = 0; + } + unlock(); +#ifndef LMMS_BUILD_WIN32 + usleep( 5 ); +#endif + lock(); + } + fastMemCpy( m_data->data + m_data->endPtr, _buf, _len ); + m_data->endPtr += _len; + unlock(); + } + + bool m_master; +#ifdef USE_NATIVE_SHMEM + key_t m_shmKey; + int m_shmID; +#else + int m_shmKey; + QSharedMemory m_shmObj; +#endif + size_t m_shmSize; + shmData * m_data; + sem_t * m_dataSem; + sem_t * m_messageSem; + int m_lockDepth; + +} ; + enum RemoteMessageIDs @@ -127,7 +362,6 @@ enum RemoteMessageIDs IdGeneralFailure, IdInitDone, IdClosePlugin, - IdDebugMessage, IdSampleRateInformation, IdBufferSizeInformation, IdMidiEvent, @@ -193,13 +427,18 @@ public: std::vector data; } ; - remotePluginBase( void ); + remotePluginBase( shmFifo * _in, shmFifo * _out ); virtual ~remotePluginBase(); void sendMessage( const message & _m ); message receiveMessage( void ); - bool messagesLeft( void ) const; + inline bool messagesLeft( void ) + { + return( m_in->messagesLeft() ); + } + + message waitForMessage( const message & _m, bool _busy_waiting = FALSE ); @@ -222,19 +461,20 @@ public: protected: - void setInFD( int _fd ) + const shmFifo * in( void ) const { - m_inFD = _fd; + return( m_in ); } - void setOutFD( int _fd ) + const shmFifo * out( void ) const { - m_outFD = _fd; + return( m_out ); } + private: - int m_inFD; - int m_outFD; + shmFifo * m_in; + shmFifo * m_out; } ; @@ -287,17 +527,16 @@ protected: private: - void resizeSharedMemory( void ); + void resizeSharedProcessingMemory( void ); bool m_initialized; bool m_failed; - int m_pluginPID; - int m_pipes[2][2]; + QProcess m_process; QMutex m_commMutex; -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM int m_shmID; #else QSharedMemory m_shmObj; @@ -318,7 +557,7 @@ private: class remotePluginClient : public remotePluginBase { public: - remotePluginClient( void ); + remotePluginClient( key_t _shm_in, key_t _shm_out ); virtual ~remotePluginClient(); virtual bool processMessage( const message & _m ); @@ -349,20 +588,6 @@ public: return( m_bufferSize ); } - void debugMessage( const char * _fmt, ... ) - { - va_list ap; - char buffer[512]; - - va_start( ap, _fmt ); - buffer[0] = 0; - vsnprintf( buffer, sizeof( buffer ), _fmt, ap ); - message m( IdDebugMessage ); - m.data.push_back( std::string( buffer ) ); - sendMessage( m ); - va_end( ap ); - } - void setInputCount( int _i ) { m_inputCount = _i; @@ -381,10 +606,10 @@ public: private: - void setShmKey( int _key, int _size ); + void setShmKey( key_t _key, int _size ); void doProcessing( void ); -#ifndef USE_NATIVE_SHM_API +#ifndef USE_NATIVE_SHMEM QSharedMemory m_shmObj; #endif float * m_shm; @@ -409,9 +634,9 @@ private: #endif -remotePluginBase::remotePluginBase( void ) : - m_inFD( -1 ), - m_outFD( -1 ) +remotePluginBase::remotePluginBase( shmFifo * _in, shmFifo * _out ) : + m_in( _in ), + m_out( _out ) { } @@ -420,6 +645,8 @@ remotePluginBase::remotePluginBase( void ) : remotePluginBase::~remotePluginBase() { + delete m_in; + delete m_out; } @@ -427,12 +654,15 @@ remotePluginBase::~remotePluginBase() void remotePluginBase::sendMessage( const message & _m ) { - writeInt( _m.id, m_outFD ); - writeInt( _m.data.size(), m_outFD ); + m_out->lock(); + m_out->writeInt( _m.id ); + m_out->writeInt( _m.data.size() ); for( int i = 0; i < _m.data.size(); ++i ) { - writeString( _m.data[i], m_outFD ); + m_out->writeString( _m.data[i] ); } + m_out->unlock(); + m_out->messageSent(); } @@ -440,66 +670,46 @@ void remotePluginBase::sendMessage( const message & _m ) remotePluginBase::message remotePluginBase::receiveMessage( void ) { + m_in->waitForMessage(); + m_in->lock(); message m; - m.id = readInt( m_inFD ); - const int s = readInt( m_inFD ); + m.id = m_in->readInt(); + const int s = m_in->readInt(); for( int i = 0; i < s; ++i ) { - m.data.push_back( readString( m_inFD ) ); + m.data.push_back( m_in->readString() ); } + m_in->unlock(); return( m ); } -inline bool remotePluginBase::messagesLeft( void ) const -{ - fd_set rfds; - FD_ZERO( &rfds ); - FD_SET( m_inFD, &rfds ); - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1; // can we use 0 here? - return( select( m_inFD + 1, &rfds, NULL, NULL, &tv ) > 0 ); -} - - - - remotePluginBase::message remotePluginBase::waitForMessage( const message & _wm, bool _busy_waiting ) { while( 1 ) { - if( messagesLeft() ) - { - message m; - m = receiveMessage(); - processMessage( m ); - if( m.id == _wm.id ) - { - return( m ); - } - else if( m.id == IdGeneralFailure ) - { - return( m ); - } - } #ifndef BUILD_REMOTE_PLUGIN_CLIENT - if( _busy_waiting ) + if( _busy_waiting && !messagesLeft() ) { QCoreApplication::processEvents( QEventLoop::AllEvents, 50 ); + continue; } - else - { - usleep( 10 ); - } -#else - usleep( 10 ); #endif + message m = receiveMessage(); + processMessage( m ); + if( m.id == _wm.id ) + { + return( m ); + } + else if( m.id == IdGeneralFailure ) + { + return( m ); + } } } @@ -513,9 +723,10 @@ remotePluginBase::message remotePluginBase::waitForMessage( #ifdef BUILD_REMOTE_PLUGIN_CLIENT -remotePluginClient::remotePluginClient( void ) : - remotePluginBase(), -#ifndef USE_NATIVE_SHM_API +remotePluginClient::remotePluginClient( key_t _shm_in, key_t _shm_out ) : + remotePluginBase( new shmFifo( _shm_in ), + new shmFifo( _shm_out ) ), +#ifndef USE_NATIVE_SHMEM m_shmObj(), #endif m_shm( NULL ), @@ -523,8 +734,6 @@ remotePluginClient::remotePluginClient( void ) : m_outputCount( DEFAULT_CHANNELS ), m_bufferSize( DEFAULT_BUFFER_SIZE ) { - setInFD( 0 ); - setOutFD( 1 ); sendMessage( IdSampleRateInformation ); sendMessage( IdBufferSizeInformation ); } @@ -536,7 +745,7 @@ remotePluginClient::~remotePluginClient() { sendMessage( IdClosePlugin ); -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM shmdt( m_shm ); #endif } @@ -600,9 +809,9 @@ bool remotePluginClient::processMessage( const message & _m ) -void remotePluginClient::setShmKey( int _key, int _size ) +void remotePluginClient::setShmKey( key_t _key, int _size ) { -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM if( m_shm != NULL ) { shmdt( m_shm ); @@ -618,7 +827,7 @@ void remotePluginClient::setShmKey( int _key, int _size ) int shm_id = shmget( _key, _size, 0 ); if( shm_id == -1 ) { - debugMessage( "failed getting shared memory" ); + fprintf( stderr, "failed getting shared memory\n" ); } else { @@ -632,7 +841,7 @@ void remotePluginClient::setShmKey( int _key, int _size ) } else { - debugMessage( "failed getting shared memory" ); + fprintf( stderr, "failed getting shared memory\n" ); } #endif } diff --git a/lmmsconfig.h.in b/lmmsconfig.h.in index 9b31ece75..fa66b5419 100644 --- a/lmmsconfig.h.in +++ b/lmmsconfig.h.in @@ -22,6 +22,7 @@ #cmakedefine LMMS_HAVE_UNISTD_H #cmakedefine LMMS_HAVE_SYS_TYPES_H #cmakedefine LMMS_HAVE_SYS_IPC_H +#cmakedefine LMMS_HAVE_SEMAPHORE_H #cmakedefine LMMS_HAVE_SYS_SHM_H #cmakedefine LMMS_HAVE_SYS_TIME_H #cmakedefine LMMS_HAVE_SYS_WAIT_H diff --git a/src/core/remote_plugin.cpp b/src/core/remote_plugin.cpp index c8eb4b612..52569ab95 100644 --- a/src/core/remote_plugin.cpp +++ b/src/core/remote_plugin.cpp @@ -31,43 +31,20 @@ #include "config_mgr.h" #include -#include #ifdef LMMS_HAVE_UNISTD_H #include #endif -#ifdef LMMS_HAVE_SIGNAL_H -#include -#endif - -#include - -#ifdef LMMS_HAVE_SYS_SELECT_H -#include -#endif - -#ifdef LMMS_HAVE_SYS_TIME_H -#include -#endif - -#ifdef LMMS_HAVE_SYS_WAIT_H -#include -#endif - - - - remotePlugin::remotePlugin( const QString & _plugin_executable ) : - remotePluginBase(), + remotePluginBase( new shmFifo(), new shmFifo() ), m_initialized( false ), m_failed( true ), - m_pluginPID( -1 ), m_commMutex( QMutex::Recursive ), -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM m_shmID( 0 ), #else m_shmObj(), @@ -77,32 +54,18 @@ remotePlugin::remotePlugin( const QString & _plugin_executable ) : m_inputCount( DEFAULT_CHANNELS ), m_outputCount( DEFAULT_CHANNELS ) { - QString fe = configManager::inst()->pluginDir() + - QDir::separator() + _plugin_executable; - if( pipe( m_pipes[0] ) || pipe( m_pipes[1] ) ) - { - printf( "error while creating pipes!\n" ); - } - - if( ( m_pluginPID = fork() ) < 0 ) - { - printf( "fork() failed!\n" ); - return; - } - else if( m_pluginPID == 0 ) - { - dup2( m_pipes[0][0], 0 ); - dup2( m_pipes[1][1], 1 ); - execlp( fe.toAscii().constData(), fe.toAscii().constData(), - NULL ); - exit( 0 ); - } - setInFD( m_pipes[1][0] ); - setOutFD( m_pipes[0][1] ); - - resizeSharedMemory(); - lock(); + QString exec = configManager::inst()->pluginDir() + + QDir::separator() + _plugin_executable; + QStringList args; + // swap in and out for bidirectional communication + args << QString::number( out()->shmKey() ); + args << QString::number( in()->shmKey() ); + m_process.setProcessChannelMode( QProcess::MergedChannels ); + m_process.start( exec, args ); + + resizeSharedProcessingMemory(); + m_failed = waitForMessage( IdInitDone ).id != IdInitDone; unlock(); } @@ -116,35 +79,14 @@ remotePlugin::~remotePlugin() { sendMessage( IdClosePlugin ); - // wait for acknowledge - QTime t; - t.start(); - while( t.elapsed() < 1000 ) + m_process.waitForFinished( 1000 ); + if( m_process.state() != QProcess::NotRunning ) { - if( messagesLeft() && - receiveMessage().id == IdClosePlugin ) - { - //m_pluginPID = 0; - break; - } - usleep( 10 ); + m_process.terminate(); + m_process.kill(); } - // timeout? -/* if( m_pluginPID != 0 ) - {*/ - kill( m_pluginPID, SIGTERM ); - kill( m_pluginPID, SIGKILL ); - //} - // remove process from PCB - waitpid( m_pluginPID, NULL, 0 ); - // close all sides of our pipes - close( m_pipes[0][0] ); - close( m_pipes[0][1] ); - close( m_pipes[1][0] ); - close( m_pipes[1][1] ); - -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM shmdt( m_shm ); shmctl( m_shmID, IPC_RMID, NULL ); #endif @@ -224,20 +166,25 @@ bool remotePlugin::waitForProcessingFinished( sampleFrame * _out_buf ) const fpp_t frames = engine::getMixer()->framesPerPeriod(); const ch_cnt_t outputs = tMax( m_outputCount, DEFAULT_CHANNELS ); + sampleFrame * o = (sampleFrame *) ( m_shm + m_inputCount*frames ); if( outputs != DEFAULT_CHANNELS ) { // clear buffer, if plugin didn't fill up both channels engine::getMixer()->clearAudioBuffer( _out_buf, frames ); - } - sampleFrame * o = (sampleFrame *) ( m_shm + m_inputCount*frames ); - for( ch_cnt_t ch = 0; ch < outputs; ++ch ) - { - for( fpp_t frame = 0; frame < frames; ++frame ) + for( ch_cnt_t ch = 0; ch < + qMin( DEFAULT_CHANNELS, outputs ); ++ch ) { - _out_buf[frame][ch] = o[frame][ch]; + for( fpp_t frame = 0; frame < frames; ++frame ) + { + _out_buf[frame][ch] = o[frame][ch]; + } } } + else + { + memcpy( _out_buf, o, frames * sizeof( sampleFrame ) ); + } return( TRUE ); } @@ -262,14 +209,14 @@ void remotePlugin::processMidiEvent( const midiEvent & _e, -void remotePlugin::resizeSharedMemory( void ) +void remotePlugin::resizeSharedProcessingMemory( void ) { const size_t s = ( m_inputCount+m_outputCount ) * engine::getMixer()->framesPerPeriod() * sizeof( float ); if( m_shm != NULL ) { -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM shmdt( m_shm ); shmctl( m_shmID, IPC_RMID, NULL ); #else @@ -278,7 +225,7 @@ void remotePlugin::resizeSharedMemory( void ) } int shm_key = 0; -#ifdef USE_NATIVE_SHM_API +#ifdef USE_NATIVE_SHMEM while( ( m_shmID = shmget( ++shm_key, s, IPC_CREAT | IPC_EXCL | 0600 ) ) == -1 ) { @@ -317,11 +264,6 @@ bool remotePlugin::processMessage( const message & _m ) case IdGeneralFailure: return( false ); - case IdDebugMessage: - printf( "debug message from client: %s\n", - _m.data[0].c_str() ); - break; - case IdInitDone: reply = true; break; @@ -340,12 +282,12 @@ bool remotePlugin::processMessage( const message & _m ) case IdChangeInputCount: m_inputCount = _m.getInt( 0 ); - resizeSharedMemory(); + resizeSharedProcessingMemory(); break; case IdChangeOutputCount: m_outputCount = _m.getInt( 0 ); - resizeSharedMemory(); + resizeSharedProcessingMemory(); break; case IdProcessingDone: