Main MRPT website > C++ reference for MRPT 1.9.9
CNTRIPClient.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | http://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2018, Individual contributors, see AUTHORS file |
6  | See: http://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See details in http://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
14 #include <mrpt/comms/net_utils.h>
15 #include <mrpt/math/wrap2pi.h>
16 #include <mrpt/core/format.h>
18 #include <mrpt/core/bits_math.h>
19 #include <iostream>
20 
21 using namespace mrpt;
22 using namespace mrpt::comms;
23 using namespace mrpt::system;
24 using namespace mrpt::hwdrivers;
25 using namespace mrpt::math;
26 using namespace std;
27 
28 /* --------------------------------------------------------
29  CNTRIPClient
30  -------------------------------------------------------- */
31 CNTRIPClient::CNTRIPClient()
32  : m_thread(),
33  m_thread_exit(false),
34  m_thread_do_process(false),
35  m_waiting_answer_connection(false),
36  m_answer_connection(connError),
37  m_args()
38 {
39  m_thread = std::thread(&CNTRIPClient::private_ntrip_thread, this);
40 }
41 
42 /* --------------------------------------------------------
43  ~CNTRIPClient
44  -------------------------------------------------------- */
46 {
47  this->close();
48  if (m_thread.joinable())
49  {
50  m_thread_exit = true;
51  m_thread.join();
52  }
53 }
54 
55 /* --------------------------------------------------------
56  close
57  -------------------------------------------------------- */
59 {
61  if (!m_thread_do_process) return;
62  m_thread_do_process = false;
63  m_sem_sock_closed.get_future().wait_for(500ms);
64 }
65 
66 /* --------------------------------------------------------
67  open
68  -------------------------------------------------------- */
69 bool CNTRIPClient::open(const NTRIPArgs& params, string& out_errmsg)
70 {
71  this->close();
72 
73  if (params.mountpoint.empty())
74  {
75  out_errmsg = "MOUNTPOINT cannot be empty.";
76  return false;
77  }
78  if (params.server.empty())
79  {
80  out_errmsg = "Server address cannot be empty.";
81  return false;
82  }
83 
84  // Try to open it:
87  out_errmsg.clear();
88 
89  m_args = params;
90  m_thread_do_process = true;
91 
92  // Wait until the thread tell us the initial result...
93  if (m_sem_first_connect_done.get_future().wait_for(6s) ==
94  std::future_status::timeout)
95  {
96  out_errmsg = "Timeout waiting thread response";
97  return false;
98  }
99 
100  switch (m_answer_connection)
101  {
102  case connOk:
103  return true;
104  case connError:
105  out_errmsg = format(
106  "Error trying to connect to server '%s'",
107  params.server.c_str());
108  return false;
109  case connUnauthorized:
110  out_errmsg = format(
111  "Authentication failed for server '%s'", params.server.c_str());
112  return false;
113 
114  default:
115  out_errmsg = "UNKNOWN m_answer_connection!!";
116  return false;
117  }
118 }
119 
120 /* --------------------------------------------------------
121  THE WORKING THREAD
122  -------------------------------------------------------- */
124 {
125  try
126  {
127  CClientTCPSocket my_sock;
128 
129  bool last_thread_do_process = m_thread_do_process;
130 
131  while (!m_thread_exit)
132  {
133  if (!m_thread_do_process)
134  {
135  if (my_sock.isConnected())
136  {
137  // Close connection:
138  try
139  {
140  my_sock.close();
141  }
142  catch (...)
143  {
144  }
145  }
146  else
147  {
148  // Nothing to be done... just wait
149  }
150 
151  if (last_thread_do_process) // Let the waiting caller continue
152  // now.
153  m_sem_sock_closed.set_value();
154 
155  last_thread_do_process = m_thread_do_process;
156  std::this_thread::sleep_for(100ms);
157  continue;
158  }
159 
160  last_thread_do_process = m_thread_do_process;
161 
162  // We have a mission to do here... is the channel already open??
163 
164  if (!my_sock.isConnected())
165  {
166  TConnResult connect_res = connError;
167 
168  std::vector<uint8_t> buf;
169  try
170  {
171  // Nope, it's the first time: get params and try open the
172  // connection:
173  stream_data.clear();
174 
175  cout << format(
176  "[CNTRIPClient] Trying to connect to %s:%i\n",
177  m_args.server.c_str(), m_args.port);
178 
179  my_sock.connect(m_args.server, m_args.port);
180  if (m_thread_exit) break;
181 
182  // Prepare HTTP request:
183  // -------------------------------------------
184  string req = format(
185  "GET /%s HTTP/1.0\r\n", m_args.mountpoint.c_str());
186 
187  if (isalpha(m_args.server[0]))
188  req += format("Host: %s\r\n", m_args.server.c_str());
189 
190  req += "User-Agent: NTRIP MRPT Library\r\n";
191  req += "Accept: */*\r\n";
192  req += "Connection: close\r\n";
193 
194  // Implement HTTP Basic authentication:
195  // See:
196  // http://en.wikipedia.org/wiki/Basic_access_authentication
197  if (!m_args.user.empty())
198  {
199  string auth_str =
200  m_args.user + string(":") + m_args.password;
201  std::vector<uint8_t> v(auth_str.size());
202  ::memcpy(&v[0], &auth_str[0], auth_str.size());
203 
204  string encoded_str;
205  mrpt::system::encodeBase64(v, encoded_str);
206 
207  req += "Authorization: Basic ";
208  req += encoded_str;
209  req += "\r\n";
210  }
211 
212  // End:
213  req += "\r\n";
214  // cout << req;
215 
216  // Send:
217  my_sock.sendString(req);
218 
219  // Try to read the header of the response:
220  size_t to_read_now = 30;
221  buf.resize(to_read_now);
222  size_t len =
223  my_sock.readAsync(&buf[0], to_read_now, 4000, 200);
224 
225  buf.resize(len);
226 
227  if ((len != 0) && my_sock.isConnected())
228  connect_res = connOk;
229  }
230  catch (std::exception&)
231  {
232  // cout << e.what() << endl;
233  connect_res = connError;
234  }
235 
236  // We are not disconnected yet, it's a good thing... anyway,
237  // check the answer code:
238  if (!buf.empty())
239  {
240  string resp;
241  resp.resize(buf.size());
242  ::memcpy(&resp[0], &buf[0], buf.size());
243 
244  if (resp.find(" 200 ") == string::npos)
245  {
246  // It's NOT a good response...
247  connect_res = connError;
248 
249  // 401?
250  if (resp.find(" 401 ") != string::npos)
251  connect_res = connUnauthorized;
252  }
253  }
254 
255  // Signal my caller that the connection is established:
256  // ---------------------------------------------------------------
258  {
260 
261  m_answer_connection = connect_res;
262  m_sem_first_connect_done.set_value();
263  }
264 
265  if (connect_res != connOk) my_sock.close();
266  }
267 
268  // Retry if it was a failed connection.
269  if (!my_sock.isConnected())
270  {
271  std::this_thread::sleep_for(500ms);
272  continue;
273  }
274 
275  // Read data from the stream and accumulate it in a buffer:
276  // ----------------------------------------------------------------------
277  std::vector<uint8_t> buf;
278  size_t to_read_now = 1000;
279  buf.resize(to_read_now);
280  size_t len = my_sock.readAsync(&buf[0], to_read_now, 10, 5);
281 
282  buf.resize(len);
283 
284  if (my_sock.isConnected())
285  {
286  // Send data to main buffer:
287  if (stream_data.size() > 1024 * 8)
288  stream_data.clear(); // It seems nobody's reading it...
289 
290  stream_data.appendData(buf);
291  buf.clear();
292  }
293 
294  // Send back data to the server, if so requested:
295  // ------------------------------------------
296  std::vector<uint8_t> upload_data;
297  m_upload_data.readAndClear(upload_data);
298  if (!upload_data.empty())
299  {
300  const size_t N = upload_data.size();
301  const size_t nWritten =
302  my_sock.writeAsync(&upload_data[0], N, 1000);
303  if (nWritten != N)
304  cerr << "*ERROR*: Couldn't write back " << N
305  << " bytes to NTRIP server!.\n";
306  }
307 
308  std::this_thread::sleep_for(10ms);
309  } // end while
310 
311  } // end try
312  catch (exception& e)
313  {
314  cerr << "[CNTRIPClient] Exception in working thread: " << endl
315  << e.what() << endl;
316  }
317  catch (...)
318  {
319  cerr << "[CNTRIPClient] Runtime exception in working thread." << endl;
320  }
321 
322 } // end working thread
323 
324 /* --------------------------------------------------------
325  retrieveListOfMountpoints
326  -------------------------------------------------------- */
328  TListMountPoints& out_list, string& out_errmsg, const string& server,
329  int port, const string& auth_user, const string& auth_pass)
330 {
331  string content;
332  int http_code;
333  TParameters<string> my_headers;
334 
335  out_list.clear();
336 
338  string("http://") + server, content, out_errmsg, port, auth_user,
339  auth_pass, &http_code, &my_headers, nullptr, 6000);
340 
341  // Parse contents:
342  if (ret != net::erOk) return false;
343 
344  std::stringstream ss(content);
345  string lin;
346  while (std::getline(ss, lin, '\n'))
347  {
348  if (lin.size() < 5) continue;
349  if (0 != ::strncmp("STR;", lin.c_str(), 4)) continue;
350 
351  // ok, it's a stream:
352  deque<string> fields;
353  mrpt::system::tokenize(lin, ";", fields);
354 
355  if (fields.size() < 13) continue;
356 
357  TMountPoint mnt;
358 
359  mnt.mountpoint_name = fields[1];
360  mnt.id = fields[2];
361  mnt.format = fields[3];
362  mnt.format_details = fields[4];
363  mnt.carrier = atoi(fields[5].c_str());
364  mnt.nav_system = fields[6];
365  mnt.network = fields[7];
366  mnt.country_code = fields[8];
367  mnt.latitude = atof(fields[9].c_str());
368  mnt.longitude = atof(fields[10].c_str());
369 
370  // Longitude in range: -180,180
372 
373  mnt.needs_nmea = atoi(fields[11].c_str()) != 0;
374  mnt.net_ref_stations = atoi(fields[12].c_str()) != 0;
375 
376  if (fields.size() >= 19) mnt.extra_info = fields[18];
377 
378  out_list.push_back(mnt);
379  }
380 
381  return true;
382 }
383 
384 /** Enqueues a string to be sent back to the NTRIP server (e.g. GGA frames) */
386 {
387  if (data.empty()) return;
388 
389  std::vector<uint8_t> d(data.size());
390  ::memcpy(&d[0], &data[0], data.size());
392 }
std::promise< void > m_sem_sock_closed
Definition: CNTRIPClient.h:116
bool open(const NTRIPArgs &params, std::string &out_errmsg)
Tries to open a given NTRIP stream and, if successful, launches a thread for continuously reading fro...
double RAD2DEG(const double x)
Radians to degrees.
void appendData(const std::vector< uint8_t > &d)
Append new data to the stream.
Definition: MT_buffer.h:54
void connect(const std::string &remotePartAddress, unsigned short remotePartTCPPort, unsigned int timeout_ms=0)
Establishes a connection with a remote part.
This namespace provides a OS-independent interface to many useful functions: filenames manipulation...
Definition: math_frwds.h:25
bool isConnected()
Returns true if this objects represents a successfully connected socket.
double DEG2RAD(const double x)
Degrees to radians.
void private_ntrip_thread()
The working thread.
static bool retrieveListOfMountpoints(TListMountPoints &out_list, std::string &out_errmsg, const std::string &server, int port=2101, const std::string &auth_user=std::string(), const std::string &auth_pass=std::string())
Connect to a given NTRIP caster and get the list of all available mountpoints and their parameters...
Contains classes for various device interfaces.
STL namespace.
void close()
Closes the connection.
GLdouble s
Definition: glext.h:3676
GLenum GLsizei len
Definition: glext.h:4712
A descriptor of one stream in an NTRIP Caster - See CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:45
ERRORCODE_HTTP http_get(const string &url, std::vector< uint8_t > &out_content, string &out_errormsg, int port=80, const string &auth_user=string(), const string &auth_pass=string(), int *out_http_responsecode=nullptr, mrpt::system::TParameters< string > *extra_headers=nullptr, mrpt::system::TParameters< string > *out_headers=nullptr, int timeout_ms=1000)
Perform an HTTP GET operation (version for retrieving the data as a std::vector<uint8_t>) ...
Definition: net_utils.cpp:386
virtual ~CNTRIPClient()
Default destructor.
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
This base provides a set of functions for maths stuff.
std::string format
RTCM 2.3, RTCM 3, CMR+, etc...
Definition: CNTRIPClient.h:51
ERRORCODE_HTTP
Possible returns from a HTTP request.
Definition: net_utils.h:31
NTRIPArgs m_args
All the parameters for the NTRIP connection.
Definition: CNTRIPClient.h:133
std::string country_code
ITA, ESP, DEU,...
Definition: CNTRIPClient.h:60
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:16
int carrier
0: No carrier phase, 1: L1, 2: L1+L2
Definition: CNTRIPClient.h:54
bool m_thread_do_process
Will be "true" between "open" and "close".
Definition: CNTRIPClient.h:121
GLsizei const GLchar ** string
Definition: glext.h:4101
T wrapToPi(T a)
Modifies the given angle to translate it into the ]-pi,pi] range.
Definition: wrap2pi.h:53
size_t writeAsync(const void *Buffer, const size_t Count, const int timeout_ms=-1)
A method for writing to the socket with optional timeouts.
mrpt::containers::MT_buffer m_upload_data
Buffer for data to be sent back to the server.
Definition: CNTRIPClient.h:136
const GLdouble * v
Definition: glext.h:3678
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::promise< void > m_sem_first_connect_done
Definition: CNTRIPClient.h:117
void encodeBase64(const std::vector< uint8_t > &inputData, std::string &outString)
Encode a sequence of bytes as a string in base-64.
Definition: base64.cpp:29
A TCP socket that can be connected to a TCP server, implementing MRPT&#39;s CStream interface for passing...
void close()
Closes the connection.
void readAndClear(std::vector< uint8_t > &d)
Read the whole buffer and empty it.
Definition: MT_buffer.h:62
void clear()
Empty the buffer.
Definition: MT_buffer.h:36
Serial and networking devices and utilities.
size_t readAsync(void *Buffer, const size_t Count, const int timeoutStart_ms=-1, const int timeoutBetween_ms=-1)
A method for reading from the socket with an optional timeout.
std::list< TMountPoint > TListMountPoints
Used in CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:87
For usage when passing a dynamic number of (numeric) arguments to a function, by name.
Definition: TParameters.h:56
GLsizei GLsizei GLenum GLenum const GLvoid * data
Definition: glext.h:3546
GLenum const GLfloat * params
Definition: glext.h:3534
The arguments for connecting to a NTRIP stream, used in CNTRIPClient::open.
Definition: CNTRIPClient.h:92
mrpt::containers::MT_buffer stream_data
The buffer with all the bytes so-far read from the NTRIP server stream.
Definition: CNTRIPClient.h:163
void sendString(const std::string &str)
Writes a string to the socket.
void memcpy(void *dest, size_t destSize, const void *src, size_t copyCount) noexcept
An OS and compiler independent version of "memcpy".
Definition: os.cpp:356
size_t size()
Return the number of available bytes at this moment.
Definition: MT_buffer.h:44
void sendBackToServer(const std::string &data)
Enqueues a string to be sent back to the NTRIP server (e.g.



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: ad3a9d8ae Tue May 1 23:10:22 2018 -0700 at lun oct 28 00:14:14 CET 2019