00001 #ifdef CCAFE_THREADS
00002
00003 #ifndef ServerMux_seen
00004 #define ServerMux_seen
00005
00024 class IOThread :public CCAFEThread {
00025 private:
00026 ConnectionManager* manager;
00027 boolean running;
00028 public:
00029 IOThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; };
00030 void* run()
00031 {
00032 manager->notifyReads();
00033 return NULL;
00034 };
00035 };
00036
00037
00038
00039
00040
00045 class ConnectThread :public CCAFEThread {
00046 private:
00047 ConnectionManager* manager;
00048 boolean running;
00049 public:
00050 ConnectThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; };
00051 void* run()
00052 {
00053 manager->notifyReconnect();
00054 return NULL;
00055 };
00056 };
00057
00058
00059
00060
00061
00065 class ServerMux :public virtual ClientOutputListener,
00066 public virtual ClientOutputRelay {
00067 private:
00068 Client* controllerClient;
00069 CCAFEThread* controllerThread;
00070 Client** clients;
00071 int numClients;
00072 CCAFEThreadPool threadpool;
00073 boolean isStarted;
00074 ConnectionManager* computationalClientManager;
00075 ClientOutputCollector* dataCollector;
00076 JCPN(Vector) listeners;
00077 IOThread ioThread;
00078 ConnectThread connThread;
00079 static char* outOfBandToken;
00080 DataCollectorFactory dataCollectorFactory;
00081 CCAFEReadWriteMutex dataCollectorMutex;
00082
00083 public:
00084 static const char* SERVER_SRC ;
00085 static const char* DATA_COLLECTOR_ACK ;
00086 static const char* DATA_COLLECTOR_ERR ;
00087 static const char* DATA_COLLECTOR_MSG ;
00088 static const char* SHUTDOWN_MSG ;
00089 static const char* REMOVE_CLIENT_MSG ;
00090 ServerMux(ConnectionManager* computeManager, Connection* controllerConnect);
00091 ~ServerMux();
00092 void shutdown();
00093 void join();
00094
00095
00096 CCAFEThread* getControllerClientThread();
00097
00098 void setExternalClientOutputListener(ClientOutputListener* xLsnr);
00099 void doClientIO();
00100 void broadcastToClients(const char* s);
00101 boolean isRunning() { return isStarted; };
00102
00103
00104 virtual void clientOutput(ClientOutputEvent* evt);
00105
00106
00107 virtual void relayMessageFromDataProducers(char* s);
00108 virtual void relayMessageFromController(char* s);
00109 virtual void setDataCollectorByName( char* className );
00110 virtual int getNumClients();
00111
00112
00113 void addOutOfBandListener(OutOfBandListener* l) {
00114 listeners.addElement(l);
00115 }
00116 void removeOutOfBandListener(OutOfBandListener* l) {
00117 listeners.removeElement(l);
00118 }
00129 static const char* getOutofBandToken();
00130
00131 private:
00133 void doOutOfBandCommands(char* line, Client* src);
00135 void fireOOBListeners(char* cmd, Client* client);
00136 };
00137 #endif // seen servermux
00138 #else
00139 extern int ccafe_no_servermux;
00140 #endif // CCAFE_THREADS