Merge pull request #2739 from jasp00/sync-socket

Synchronize with remote plugins using local sockets
This commit is contained in:
Javier Serrano Polo
2016-06-04 00:34:30 +00:00
9 changed files with 409 additions and 91 deletions

View File

@@ -37,10 +37,8 @@
#include <cassert>
#if defined(LMMS_HAVE_SYS_IPC_H) && defined(LMMS_HAVE_SEMAPHORE_H)
#include <sys/ipc.h>
#include <semaphore.h>
#else
#if !(defined(LMMS_HAVE_SYS_IPC_H) && defined(LMMS_HAVE_SEMAPHORE_H))
#define SYNC_WITH_SHM_FIFO
#define USE_QT_SEMAPHORES
#ifdef LMMS_HAVE_PROCESS_H
@@ -74,17 +72,33 @@ typedef int32_t key_t;
#include <locale.h>
#endif
#ifdef LMMS_HAVE_PTHREAD_H
#include <pthread.h>
#endif
#ifdef BUILD_REMOTE_PLUGIN_CLIENT
#undef EXPORT
#define EXPORT
#define COMPILE_REMOTE_PLUGIN_BASE
#ifndef SYNC_WITH_SHM_FIFO
#include <sys/socket.h>
#include <sys/un.h>
#endif
#else
#include <QtCore/QMutex>
#include <QtCore/QProcess>
#include <QtCore/QThread>
#ifndef SYNC_WITH_SHM_FIFO
#include <poll.h>
#endif
#endif
#ifdef SYNC_WITH_SHM_FIFO
// sometimes we need to exchange bigger messages (e.g. for VST parameter dumps)
// so set a usable value here
const int SHM_FIFO_SIZE = 512*1024;
@@ -97,9 +111,6 @@ class shmFifo
// and 64 bit platforms
union sem32_t
{
#ifndef USE_QT_SEMAPHORES
sem_t sem;
#endif
int semKey;
char fill[32];
} ;
@@ -125,13 +136,8 @@ public:
m_shmID( -1 ),
#endif
m_data( NULL ),
#ifdef USE_QT_SEMAPHORES
m_dataSem( QString::null ),
m_messageSem( QString::null ),
#else
m_dataSem( NULL ),
m_messageSem( NULL ),
#endif
m_lockDepth( 0 )
{
#ifdef USE_QT_SHMEM
@@ -151,7 +157,6 @@ public:
#endif
assert( m_data != NULL );
m_data->startPtr = m_data->endPtr = 0;
#ifdef USE_QT_SEMAPHORES
static int k = 0;
m_data->dataSem.semKey = ( getpid()<<10 ) + ++k;
m_data->messageSem.semKey = ( getpid()<<10 ) + ++k;
@@ -160,20 +165,6 @@ public:
m_messageSem.setKey( QString::number(
m_data->messageSem.semKey ),
0, QSystemSemaphore::Create );
#else
m_dataSem = &m_data->dataSem.sem;
m_messageSem = &m_data->messageSem.sem;
if( sem_init( m_dataSem, 1, 1 ) )
{
fprintf( stderr, "could not initialize m_dataSem\n" );
}
if( sem_init( m_messageSem, 1, 0 ) )
{
fprintf( stderr, "could not initialize "
"m_messageSem\n" );
}
#endif
}
// constructor for remote-/client-side - use _shm_key for making up
@@ -188,13 +179,8 @@ public:
m_shmID( shmget( _shm_key, 0, 0 ) ),
#endif
m_data( NULL ),
#ifdef USE_QT_SEMAPHORES
m_dataSem( QString::null ),
m_messageSem( QString::null ),
#else
m_dataSem( NULL ),
m_messageSem( NULL ),
#endif
m_lockDepth( 0 )
{
#ifdef USE_QT_SHMEM
@@ -209,14 +195,9 @@ public:
}
#endif
assert( m_data != NULL );
#ifdef USE_QT_SEMAPHORES
m_dataSem.setKey( QString::number( m_data->dataSem.semKey ) );
m_messageSem.setKey( QString::number(
m_data->messageSem.semKey ) );
#else
m_dataSem = &m_data->dataSem.sem;
m_messageSem = &m_data->messageSem.sem;
#endif
}
~shmFifo()
@@ -226,10 +207,6 @@ public:
{
#ifndef USE_QT_SHMEM
shmctl( m_shmID, IPC_RMID, NULL );
#endif
#ifndef USE_QT_SEMAPHORES
sem_destroy( m_dataSem );
sem_destroy( m_messageSem );
#endif
}
#ifndef USE_QT_SHMEM
@@ -258,11 +235,7 @@ public:
{
if( !isInvalid() && __sync_add_and_fetch( &m_lockDepth, 1 ) == 1 )
{
#ifdef USE_QT_SEMAPHORES
m_dataSem.acquire();
#else
sem_wait( m_dataSem );
#endif
}
}
@@ -271,11 +244,7 @@ public:
{
if( __sync_sub_and_fetch( &m_lockDepth, 1) <= 0 )
{
#ifdef USE_QT_SEMAPHORES
m_dataSem.release();
#else
sem_post( m_dataSem );
#endif
}
}
@@ -284,22 +253,14 @@ public:
{
if( !isInvalid() )
{
#ifdef USE_QT_SEMAPHORES
m_messageSem.acquire();
#else
sem_wait( m_messageSem );
#endif
}
}
// increase message-semaphore
inline void messageSent()
{
#ifdef USE_QT_SEMAPHORES
m_messageSem.release();
#else
sem_post( m_messageSem );
#endif
}
@@ -345,16 +306,10 @@ public:
{
return false;
}
#ifdef USE_QT_SEMAPHORES
lock();
const bool empty = ( m_data->startPtr == m_data->endPtr );
unlock();
return !empty;
#else
int v;
sem_getvalue( m_messageSem, &v );
return v > 0;
#endif
}
@@ -446,16 +401,12 @@ private:
int m_shmID;
#endif
shmData * m_data;
#ifdef USE_QT_SEMAPHORES
QSystemSemaphore m_dataSem;
QSystemSemaphore m_messageSem;
#else
sem_t * m_dataSem;
sem_t * m_messageSem;
#endif
volatile int m_lockDepth;
} ;
#endif
@@ -567,9 +518,14 @@ public:
} ;
#ifdef SYNC_WITH_SHM_FIFO
RemotePluginBase( shmFifo * _in, shmFifo * _out );
#else
RemotePluginBase();
#endif
virtual ~RemotePluginBase();
#ifdef SYNC_WITH_SHM_FIFO
void reset( shmFifo *in, shmFifo *out )
{
delete m_in;
@@ -577,21 +533,20 @@ public:
m_in = in;
m_out = out;
}
#endif
int sendMessage( const message & _m );
message receiveMessage();
inline bool isInvalid() const
{
#ifdef SYNC_WITH_SHM_FIFO
return m_in->isInvalid() || m_out->isInvalid();
#else
return m_invalid;
#endif
}
inline bool messagesLeft()
{
return m_in->messagesLeft();
}
message waitForMessage( const message & _m,
bool _busy_waiting = false );
@@ -602,6 +557,61 @@ public:
return m;
}
#ifndef SYNC_WITH_SHM_FIFO
inline int32_t readInt()
{
int32_t i;
read( &i, sizeof( i ) );
return i;
}
inline void writeInt( const int32_t & _i )
{
write( &_i, sizeof( _i ) );
}
inline std::string readString()
{
const int len = readInt();
if( len )
{
char * sc = new char[len + 1];
read( sc, len );
sc[len] = 0;
std::string s( sc );
delete[] sc;
return s;
}
return std::string();
}
inline void writeString( const std::string & _s )
{
const int len = _s.size();
writeInt( len );
write( _s.c_str(), len );
}
#endif
#ifndef BUILD_REMOTE_PLUGIN_CLIENT
inline bool messagesLeft()
{
#ifdef SYNC_WITH_SHM_FIFO
return m_in->messagesLeft();
#else
struct pollfd pollin;
pollin.fd = m_socket;
pollin.events = POLLIN;
if ( poll( &pollin, 1, 0 ) == -1 )
{
qWarning( "Unexpected poll error." );
}
return pollin.revents & POLLIN;
#endif
}
inline void fetchAndProcessAllMessages()
{
while( messagesLeft() )
@@ -609,11 +619,13 @@ public:
fetchAndProcessNextMessage();
}
}
#endif
virtual bool processMessage( const message & _m ) = 0;
protected:
#ifdef SYNC_WITH_SHM_FIFO
inline const shmFifo * in() const
{
return m_in;
@@ -623,18 +635,88 @@ protected:
{
return m_out;
}
#endif
inline void invalidate()
{
#ifdef SYNC_WITH_SHM_FIFO
m_in->invalidate();
m_out->invalidate();
m_in->messageSent();
#else
m_invalid = true;
#endif
}
#ifndef SYNC_WITH_SHM_FIFO
int m_socket;
#endif
private:
#ifdef SYNC_WITH_SHM_FIFO
shmFifo * m_in;
shmFifo * m_out;
#else
void read( void * _buf, int _len )
{
if( isInvalid() )
{
memset( _buf, 0, _len );
return;
}
char * buf = (char *) _buf;
int remaining = _len;
while ( remaining )
{
ssize_t nread = ::read( m_socket, buf, remaining );
switch ( nread )
{
case -1:
fprintf( stderr,
"Error while reading.\n" );
case 0:
invalidate();
memset( _buf, 0, _len );
return;
}
buf += nread;
remaining -= nread;
}
}
void write( const void * _buf, int _len )
{
if( isInvalid() )
{
return;
}
const char * buf = (const char *) _buf;
int remaining = _len;
while ( remaining )
{
ssize_t nwritten = ::write( m_socket, buf, remaining );
switch ( nwritten )
{
case -1:
fprintf( stderr,
"Error while writing.\n" );
case 0:
invalidate();
return;
}
buf += nwritten;
remaining -= nwritten;
}
}
bool m_invalid;
pthread_mutex_t m_receiveMutex;
pthread_mutex_t m_sendMutex;
#endif
} ;
@@ -668,8 +750,9 @@ private:
} ;
class EXPORT RemotePlugin : public RemotePluginBase
class EXPORT RemotePlugin : public QObject, public RemotePluginBase
{
Q_OBJECT
public:
RemotePlugin();
virtual ~RemotePlugin();
@@ -724,18 +807,12 @@ public:
inline void lock()
{
if( !isInvalid() )
{
m_commMutex.lock();
}
m_commMutex.lock();
}
inline void unlock()
{
if( !isInvalid() )
{
m_commMutex.unlock();
}
m_commMutex.unlock();
}
@@ -768,7 +845,16 @@ private:
int m_inputCount;
int m_outputCount;
#ifndef SYNC_WITH_SHM_FIFO
int m_server;
QString m_socketFile;
#endif
friend class ProcessWatcher;
private slots:
void processFinished( int exitCode, QProcess::ExitStatus exitStatus );
} ;
#endif
@@ -779,7 +865,11 @@ private:
class RemotePluginClient : public RemotePluginBase
{
public:
#ifdef SYNC_WITH_SHM_FIFO
RemotePluginClient( key_t _shm_in, key_t _shm_out );
#else
RemotePluginClient( const char * socketPath );
#endif
virtual ~RemotePluginClient();
#ifdef USE_QT_SHMEM
VstSyncData * getQtVSTshm();
@@ -876,15 +966,25 @@ private:
#endif
#ifdef SYNC_WITH_SHM_FIFO
RemotePluginBase::RemotePluginBase( shmFifo * _in, shmFifo * _out ) :
m_in( _in ),
m_out( _out )
#else
RemotePluginBase::RemotePluginBase() :
m_socket( -1 ),
m_invalid( false )
#endif
{
#ifdef LMMS_HAVE_LOCALE_H
// make sure, we're using common ways to print/scan
// floats to/from strings (',' vs. '.' for decimal point etc.)
setlocale( LC_NUMERIC, "C" );
#endif
#ifndef SYNC_WITH_SHM_FIFO
pthread_mutex_init( &m_receiveMutex, NULL );
pthread_mutex_init( &m_sendMutex, NULL );
#endif
}
@@ -892,8 +992,13 @@ RemotePluginBase::RemotePluginBase( shmFifo * _in, shmFifo * _out ) :
RemotePluginBase::~RemotePluginBase()
{
#ifdef SYNC_WITH_SHM_FIFO
delete m_in;
delete m_out;
#else
pthread_mutex_destroy( &m_receiveMutex );
pthread_mutex_destroy( &m_sendMutex );
#endif
}
@@ -901,6 +1006,7 @@ RemotePluginBase::~RemotePluginBase()
int RemotePluginBase::sendMessage( const message & _m )
{
#ifdef SYNC_WITH_SHM_FIFO
m_out->lock();
m_out->writeInt( _m.id );
m_out->writeInt( _m.data.size() );
@@ -912,6 +1018,18 @@ int RemotePluginBase::sendMessage( const message & _m )
}
m_out->unlock();
m_out->messageSent();
#else
pthread_mutex_lock( &m_sendMutex );
writeInt( _m.id );
writeInt( _m.data.size() );
int j = 8;
for( unsigned int i = 0; i < _m.data.size(); ++i )
{
writeString( _m.data[i] );
j += 4 + _m.data[i].size();
}
pthread_mutex_unlock( &m_sendMutex );
#endif
return j;
}
@@ -921,6 +1039,7 @@ int RemotePluginBase::sendMessage( const message & _m )
RemotePluginBase::message RemotePluginBase::receiveMessage()
{
#ifdef SYNC_WITH_SHM_FIFO
m_in->waitForMessage();
m_in->lock();
message m;
@@ -931,6 +1050,17 @@ RemotePluginBase::message RemotePluginBase::receiveMessage()
m.data.push_back( m_in->readString() );
}
m_in->unlock();
#else
pthread_mutex_lock( &m_receiveMutex );
message m;
m.id = readInt();
const int s = readInt();
for( int i = 0; i < s; ++i )
{
m.data.push_back( readString() );
}
pthread_mutex_unlock( &m_receiveMutex );
#endif
return m;
}
@@ -976,8 +1106,13 @@ RemotePluginBase::message RemotePluginBase::waitForMessage(
#ifdef BUILD_REMOTE_PLUGIN_CLIENT
#ifdef SYNC_WITH_SHM_FIFO
RemotePluginClient::RemotePluginClient( key_t _shm_in, key_t _shm_out ) :
RemotePluginBase( new shmFifo( _shm_in ), new shmFifo( _shm_out ) ),
#else
RemotePluginClient::RemotePluginClient( const char * socketPath ) :
RemotePluginBase(),
#endif
#ifdef USE_QT_SHMEM
m_shmObj(),
m_shmQtID( "/usr/bin/lmms" ),
@@ -989,6 +1124,30 @@ RemotePluginClient::RemotePluginClient( key_t _shm_in, key_t _shm_out ) :
m_sampleRate( 44100 ),
m_bufferSize( 0 )
{
#ifndef SYNC_WITH_SHM_FIFO
struct sockaddr_un sa;
sa.sun_family = AF_LOCAL;
size_t length = strlen( socketPath );
if ( length >= sizeof sa.sun_path )
{
length = sizeof sa.sun_path - 1;
fprintf( stderr, "Socket path too long.\n" );
}
memcpy( sa.sun_path, socketPath, length );
sa.sun_path[length] = '\0';
m_socket = socket( PF_LOCAL, SOCK_STREAM, 0 );
if ( m_socket == -1 )
{
fprintf( stderr, "Could not connect to local server.\n" );
}
if ( ::connect( m_socket, (struct sockaddr *) &sa, sizeof sa ) == -1 )
{
fprintf( stderr, "Could not connect to local server.\n" );
}
#endif
#ifdef USE_QT_SHMEM
if( m_shmQtID.attach( QSharedMemory::ReadOnly ) )
{
@@ -1033,6 +1192,7 @@ RemotePluginClient::RemotePluginClient( key_t _shm_in, key_t _shm_out ) :
}
}
#endif
// if attaching shared memory fails
sendMessage( IdSampleRateInformation );
sendMessage( IdBufferSizeInformation );
@@ -1051,6 +1211,13 @@ RemotePluginClient::~RemotePluginClient()
#ifndef USE_QT_SHMEM
shmdt( m_shm );
#endif
#ifndef SYNC_WITH_SHM_FIFO
if ( close( m_socket ) == -1)
{
fprintf( stderr, "Error freeing resources.\n" );
}
#endif
}