Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2012-2020 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
21 // C++ TraCI client API implementation
22 /****************************************************************************/
23 #include <thread>
24 #include <chrono>
25 #include <array>
26 #include <libsumo/TraCIDefs.h>
27 #include "Connection.h"
28 
29 
30 namespace libtraci {
31 // ===========================================================================
32 // static member initializations
33 // ===========================================================================
34 Connection* Connection::myActive = nullptr;
35 std::map<const std::string, Connection*> Connection::myConnections;
36 
37 
38 // ===========================================================================
39 // member method definitions
40 // ===========================================================================
41 #ifdef _MSC_VER
42 /* Disable "decorated name length exceeded, name was truncated" warnings for the whole file. */
43 #pragma warning(disable: 4503)
44 #endif
45 Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
46  myLabel(label), myProcessPipe(pipe), mySocket(host, port) {
47  for (int i = 0; i <= numRetries; i++) {
48  try {
49  mySocket.connect();
50  break;
51  } catch (tcpip::SocketException&) {
52  if (i == numRetries) {
53  throw;
54  }
55  std::this_thread::sleep_for(std::chrono::seconds(1));
56  }
57  }
58 }
59 
60 
61 void
64  return;
65  }
66  tcpip::Storage outMsg;
67  // command length
68  outMsg.writeUnsignedByte(1 + 1);
69  // command id
71  mySocket.sendExact(outMsg);
72 
73  tcpip::Storage inMsg;
74  std::string acknowledgement;
75  check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
76  mySocket.close();
77  if (myProcessPipe != nullptr) {
78  std::array<char, 128> buffer;
79  std::stringstream result;
80  while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
81  result << buffer.data();
82  }
83  std::string line;
84  while (std::getline(result, line)) {
85  if (line.compare(0, 6, "Error:") == 0) {
86  std::cerr << line << std::endl;
87  } else {
88  std::cout << line << std::endl;
89  }
90  }
91 #ifdef WIN32
92  _pclose(myProcessPipe);
93 #else
94  pclose(myProcessPipe);
95 #endif
96  }
97 }
98 
99 
100 void
102  tcpip::Storage outMsg;
103  // command length
104  outMsg.writeUnsignedByte(1 + 1 + 8);
105  // command id
107  outMsg.writeDouble(time);
108  // send request message
109  mySocket.sendExact(outMsg);
110 
111  tcpip::Storage inMsg;
113  mySubscriptionResults.clear();
115  int numSubs = inMsg.readInt();
116  while (numSubs > 0) {
117  const int responseID = check_commandGetResult(inMsg, 0, -1, true);
120  readVariableSubscription(responseID, inMsg);
121  } else {
122  readContextSubscription(responseID, inMsg);
123  }
124  numSubs--;
125  }
126 }
127 
128 
129 void
131  tcpip::Storage outMsg;
132  // command length
133  outMsg.writeUnsignedByte(1 + 1 + 4);
134  // command id
136  // client index
137  outMsg.writeInt(order);
138  mySocket.sendExact(outMsg);
139 }
140 
141 
142 void
143 Connection::createCommand(int cmdID, int varID, const std::string& objID, tcpip::Storage* add) const {
144  myOutput.reset();
145  // command length
146  int length = 1 + 1;
147  if (varID >= 0) {
148  length += 1 + 4 + (int) objID.length();
149  }
150  if (add != nullptr) {
151  length += (int)add->size();
152  }
153  if (length <= 255) {
154  myOutput.writeUnsignedByte(length);
155  } else {
157  myOutput.writeInt(length + 4);
158  }
160  if (varID >= 0) {
162  myOutput.writeString(objID);
163  }
164  // additional values
165  if (add != nullptr) {
166  myOutput.writeStorage(*add);
167  }
168 }
169 
170 
171 void
172 Connection::createFilterCommand(int cmdID, int varID, tcpip::Storage* add) const {
173  myOutput.reset();
174  // command length
175  int length = 1 + 1 + 1;
176  if (add != nullptr) {
177  length += (int)add->size();
178  }
179  if (length <= 255) {
180  myOutput.writeUnsignedByte(length);
181  } else {
183  myOutput.writeInt(length + 4);
184  }
187  // additional values
188  if (add != nullptr) {
189  myOutput.writeStorage(*add);
190  }
191 }
192 
193 
194 void
195 Connection::subscribeObjectVariable(int domID, const std::string& objID, double beginTime, double endTime,
196  const std::vector<int>& vars, const libsumo::TraCIResults& params) {
198  throw tcpip::SocketException("Socket is not initialised");
199  }
200  tcpip::Storage outMsg;
201  // command length (domID, objID, beginTime, endTime, length, vars)
202  const int numVars = (int) vars.size();
203  outMsg.writeUnsignedByte(0);
204  outMsg.writeInt(5 + 1 + 8 + 8 + 4 + (int) objID.length() + 1 + numVars);
205  // command id
206  outMsg.writeUnsignedByte(domID);
207  // time
208  outMsg.writeDouble(beginTime);
209  outMsg.writeDouble(endTime);
210  // object id
211  outMsg.writeString(objID);
212  // command id
213  if (numVars == 1 && vars.front() == -1) {
215  // default for vehicles is edge id and lane position
216  outMsg.writeUnsignedByte(2);
219  } else {
220  // default for detectors is vehicle number, for all others (and contexts) id list
221  outMsg.writeUnsignedByte(1);
224  }
225  } else {
226  outMsg.writeUnsignedByte(numVars);
227  for (int i = 0; i < numVars; ++i) {
228  outMsg.writeUnsignedByte(vars[i]);
229  const auto& paramEntry = params.find(vars[i]);
230  if (paramEntry != params.end()) {
231  // TODO implement toPacket and adapt the message length above
232  outMsg.writePacket(paramEntry->second->toPacket());
233  }
234  }
235  }
236  // send message
237  mySocket.sendExact(outMsg);
238 
239  tcpip::Storage inMsg;
240  check_resultState(inMsg, domID);
241  if (numVars > 0) {
242  const int responseID = check_commandGetResult(inMsg, domID);
243  readVariableSubscription(responseID, inMsg);
244  }
245 }
246 
247 
248 void
249 Connection::subscribeObjectContext(int domID, const std::string& objID, double beginTime, double endTime,
250  int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
252  throw tcpip::SocketException("Socket is not initialised");
253  }
254  tcpip::Storage outMsg;
255  // command length (domID, objID, beginTime, endTime, length, vars)
256  int varNo = (int) vars.size();
257  outMsg.writeUnsignedByte(0);
258  outMsg.writeInt(5 + 1 + 8 + 8 + 4 + (int) objID.length() + 1 + 8 + 1 + varNo);
259  // command id
260  outMsg.writeUnsignedByte(domID);
261  // time
262  outMsg.writeDouble(beginTime);
263  outMsg.writeDouble(endTime);
264  // object id
265  outMsg.writeString(objID);
266  // domain and range
267  outMsg.writeUnsignedByte(domain);
268  outMsg.writeDouble(range);
269  // command id
270  outMsg.writeUnsignedByte((int)vars.size());
271  for (int i = 0; i < varNo; ++i) {
272  outMsg.writeUnsignedByte(vars[i]);
273  const auto& paramEntry = params.find(vars[i]);
274  if (paramEntry != params.end()) {
275  // TODO implement toPacket and adapt the message length above
276  outMsg.writePacket(paramEntry->second->toPacket());
277  }
278  }
279  // send message
280  mySocket.sendExact(outMsg);
281 
282  tcpip::Storage inMsg;
283  check_resultState(inMsg, domID);
284  check_commandGetResult(inMsg, domID);
285  readContextSubscription(domID, inMsg);
286 }
287 
288 
289 void
290 Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
291  mySocket.receiveExact(inMsg);
292  int cmdLength;
293  int cmdId;
294  int resultType;
295  int cmdStart;
296  std::string msg;
297  try {
298  cmdStart = inMsg.position();
299  cmdLength = inMsg.readUnsignedByte();
300  cmdId = inMsg.readUnsignedByte();
301  if (command != cmdId && !ignoreCommandId) {
302  throw libsumo::TraCIException("#Error: received status response to command: " + toString(cmdId) + " but expected: " + toString(command));
303  }
304  resultType = inMsg.readUnsignedByte();
305  msg = inMsg.readString();
306  } catch (std::invalid_argument&) {
307  throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
308  }
309  switch (resultType) {
310  case libsumo::RTYPE_ERR:
311  throw libsumo::TraCIException(".. Answered with error to command (" + toString(command) + "), [description: " + msg + "]");
313  throw libsumo::TraCIException(".. Sent command is not implemented (" + toString(command) + "), [description: " + msg + "]");
314  case libsumo::RTYPE_OK:
315  if (acknowledgement != nullptr) {
316  (*acknowledgement) = ".. Command acknowledged (" + toString(command) + "), [description: " + msg + "]";
317  }
318  break;
319  default:
320  throw libsumo::TraCIException(".. Answered with unknown result code(" + toString(resultType) + ") to command(" + toString(command) + "), [description: " + msg + "]");
321  }
322  if ((cmdStart + cmdLength) != (int) inMsg.position()) {
323  throw libsumo::TraCIException("#Error: command at position " + toString(cmdStart) + " has wrong length");
324  }
325 }
326 
327 
328 int
329 Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
330  inMsg.position(); // respStart
331  int length = inMsg.readUnsignedByte();
332  if (length == 0) {
333  length = inMsg.readInt();
334  }
335  int cmdId = inMsg.readUnsignedByte();
336  if (!ignoreCommandId && cmdId != (command + 0x10)) {
337  throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
338  }
339  if (expectedType >= 0) {
340  // not called from the TraCITestClient but from within the Connection
341  inMsg.readUnsignedByte(); // variableID
342  inMsg.readString(); // objectID
343  int valueDataType = inMsg.readUnsignedByte();
344  if (valueDataType != expectedType) {
345  throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
346  }
347  }
348  return cmdId;
349 }
350 
351 
352 bool
353 Connection::processGet(int command, int expectedType, bool ignoreCommandId) {
356  myInput.reset();
357  check_resultState(myInput, command, ignoreCommandId);
358  check_commandGetResult(myInput, command, expectedType, ignoreCommandId);
359  return true;
360  }
361  return false;
362 }
363 
364 
366 Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add) {
367  createCommand(command, var, id, add);
370  myInput.reset();
371  check_resultState(myInput, command);
372  }
373  return myInput;
374 }
375 
376 
377 void
378 Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
379  while (variableCount > 0) {
380 
381  const int variableID = inMsg.readUnsignedByte();
382  const int status = inMsg.readUnsignedByte();
383  const int type = inMsg.readUnsignedByte();
384 
385  if (status == libsumo::RTYPE_OK) {
386  switch (type) {
388  into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
389  break;
391  into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
392  break;
393  case libsumo::POSITION_2D: {
394  auto p = std::make_shared<libsumo::TraCIPosition>();
395  p->x = inMsg.readDouble();
396  p->y = inMsg.readDouble();
397  p->z = 0.;
398  into[objectID][variableID] = p;
399  break;
400  }
401  case libsumo::POSITION_3D: {
402  auto p = std::make_shared<libsumo::TraCIPosition>();
403  p->x = inMsg.readDouble();
404  p->y = inMsg.readDouble();
405  p->z = inMsg.readDouble();
406  into[objectID][variableID] = p;
407  break;
408  }
409  case libsumo::TYPE_COLOR: {
410  auto c = std::make_shared<libsumo::TraCIColor>();
411  c->r = (unsigned char)inMsg.readUnsignedByte();
412  c->g = (unsigned char)inMsg.readUnsignedByte();
413  c->b = (unsigned char)inMsg.readUnsignedByte();
414  c->a = (unsigned char)inMsg.readUnsignedByte();
415  into[objectID][variableID] = c;
416  break;
417  }
419  into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
420  break;
422  auto sl = std::make_shared<libsumo::TraCIStringList>();
423  int n = inMsg.readInt();
424  for (int i = 0; i < n; ++i) {
425  sl->value.push_back(inMsg.readString());
426  }
427  into[objectID][variableID] = sl;
428  }
429  break;
430 
431  // TODO Other data types
432 
433  default:
434  throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
435  }
436  } else {
437  throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
438  }
439 
440  variableCount--;
441  }
442 }
443 
444 
445 void
447  const std::string objectID = inMsg.readString();
448  const int variableCount = inMsg.readUnsignedByte();
449  readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
450 }
451 
452 
453 void
455  const std::string contextID = inMsg.readString();
456  inMsg.readUnsignedByte(); // context domain
457  const int variableCount = inMsg.readUnsignedByte();
458  int numObjects = inMsg.readInt();
459 
460  while (numObjects > 0) {
461  std::string objectID = inMsg.readString();
462  readVariables(inMsg, objectID, variableCount, myContextSubscriptionResults[responseID][contextID]);
463  numObjects--;
464  }
465 }
466 
467 
468 }
469 
470 
471 /****************************************************************************/
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:101
void subscribeObjectVariable(int domID, const std::string &objID, double beginTime, double endTime, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeVariable request.
Definition: Connection.cpp:195
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:45
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:62
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:329
bool processGet(int command, int expectedType, bool ignoreCommandId=false)
Definition: Connection.cpp:353
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:446
tcpip::Socket mySocket
The socket.
Definition: Connection.h:354
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:360
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:290
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:358
FILE *const myProcessPipe
Definition: Connection.h:352
void createFilterCommand(int cmdID, int varID, tcpip::Storage *add=nullptr) const
Definition: Connection.cpp:172
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:378
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:361
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:356
void send_commandSetOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:130
tcpip::Storage & doCommand(int command, int var, const std::string &id, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:366
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:364
void createCommand(int cmdID, int varID, const std::string &objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:143
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:454
static Connection * myActive
Definition: Connection.h:363
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:335
void subscribeObjectContext(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext request.
Definition: Connection.cpp:249
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:527
void sendExact(const Storage &)
Definition: socket.cpp:430
bool has_client_connection() const
Definition: socket.cpp:559
void connect()
Connects to host_:port_.
Definition: socket.cpp:358
void close()
Definition: socket.cpp:382
virtual void writePacket(unsigned char *packet, int length)
Definition: storage.cpp:367
virtual std::string readString()
Definition: storage.cpp:175
virtual void writeString(const std::string &s)
Definition: storage.cpp:192
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:316
virtual void writeDouble(double)
Definition: storage.cpp:349
virtual int readUnsignedByte()
Definition: storage.cpp:150
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:160
StorageType::size_type size() const
Definition: storage.h:118
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:383
virtual double readDouble()
Definition: storage.cpp:357
virtual int readInt()
Definition: storage.cpp:306
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
std::map< int, std::shared_ptr< TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:248
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
std::map< std::string, TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:250
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int RTYPE_OK
TRACI_CONST int TYPE_STRING