29 void run()
override { owner.runThread(); }
37 : useMessageThread (callbacksOnMessageThread),
38 magicMessageHeader (magicMessageHeaderNumber)
45 callbackConnectionState =
false;
47 masterReference.clear();
53 int portNumber,
int timeOutMillisecs)
60 if (socket->connect (hostName, portNumber, timeOutMillisecs))
62 threadIsRunning =
true;
64 thread->startThread();
76 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
78 if (newPipe->openExisting (pipeName))
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (newPipe.release());
93 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
95 if (newPipe->createNewPipe (pipeName, mustNotExist))
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (newPipe.release());
108 thread->signalThreadShouldExit();
112 if (socket !=
nullptr) socket->close();
113 if (pipe !=
nullptr) pipe->close();
116 thread->stopThread (4000);
117 deletePipeAndSocket();
121 void InterprocessConnection::deletePipeAndSocket()
132 return ((socket !=
nullptr && socket->isConnected())
133 || (pipe !=
nullptr && pipe->isOpen()))
142 if (pipe ==
nullptr && socket ==
nullptr)
145 if (socket !=
nullptr && ! socket->isLocal())
146 return socket->getHostName();
149 return IPAddress::local().toString();
159 messageData.
copyFrom (messageHeader, 0,
sizeof (messageHeader));
162 return writeData (messageData.
getData(), (
int) messageData.
getSize()) == (int) messageData.
getSize();
165 int InterprocessConnection::writeData (
void* data,
int dataSize)
169 if (socket !=
nullptr)
170 return socket->write (data, dataSize);
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
179 void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
181 jassert (socket ==
nullptr && pipe ==
nullptr);
182 socket.reset (newSocket);
184 threadIsRunning =
true;
186 thread->startThread();
189 void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
191 jassert (socket ==
nullptr && pipe ==
nullptr);
192 pipe.reset (newPipe);
194 threadIsRunning =
true;
196 thread->startThread();
203 : owner (ipc), connectionMade (connected)
206 void messageCallback()
override
208 if (
auto* ipc = owner.get())
223 void InterprocessConnection::connectionMadeInt()
225 if (! callbackConnectionState)
227 callbackConnectionState =
true;
229 if (useMessageThread)
236 void InterprocessConnection::connectionLostInt()
238 if (callbackConnectionState)
240 callbackConnectionState =
false;
242 if (useMessageThread)
243 (
new ConnectionStateMessage (
this,
false))->post();
252 : owner (ipc), data (d)
255 void messageCallback()
override
257 if (
auto* ipc = owner.get())
265 void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
267 jassert (callbackConnectionState);
269 if (useMessageThread)
276 int InterprocessConnection::readData (
void* data,
int num)
278 if (socket !=
nullptr)
279 return socket->read (data, num,
true);
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
288 bool InterprocessConnection::readNextMessage()
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader,
sizeof (messageHeader));
293 if (bytes ==
sizeof (messageHeader)
298 if (bytesInMessage > 0)
300 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
303 while (bytesInMessage > 0)
305 if (thread->threadShouldExit())
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
319 deliverDataInt (messageData);
327 if (socket !=
nullptr)
328 deletePipeAndSocket();
336 void InterprocessConnection::runThread()
338 while (! thread->threadShouldExit())
340 if (socket !=
nullptr)
342 auto ready = socket->waitUntilReady (
true, 100);
346 deletePipeAndSocket();
357 else if (pipe !=
nullptr)
359 if (! pipe->isOpen())
361 deletePipeAndSocket();
371 if (thread->threadShouldExit() || ! readNextMessage())
375 threadIsRunning =
false;