MRPT  1.9.9
CNTRIPClient.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2019, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <mrpt/comms/net_utils.h>
14 #include <mrpt/core/bits_math.h>
15 #include <mrpt/core/format.h>
17 #include <mrpt/math/wrap2pi.h>
19 #include <iostream>
20 
21 using namespace mrpt;
22 using namespace mrpt::comms;
23 using namespace mrpt::system;
24 using namespace mrpt::hwdrivers;
25 using namespace mrpt::math;
26 using namespace std;
27 
28 /* --------------------------------------------------------
29  CNTRIPClient
30  -------------------------------------------------------- */
31 CNTRIPClient::CNTRIPClient() : m_thread(), m_args()
32 {
33  m_thread = std::thread(&CNTRIPClient::private_ntrip_thread, this);
34 }
35 
36 /* --------------------------------------------------------
37  ~CNTRIPClient
38  -------------------------------------------------------- */
40 {
41  this->close();
42  if (m_thread.joinable())
43  {
44  m_thread_exit = true;
45  m_thread.join();
46  }
47 }
48 
49 /* --------------------------------------------------------
50  close
51  -------------------------------------------------------- */
53 {
55  if (!m_thread_do_process) return;
56  m_thread_do_process = false;
57  m_sem_sock_closed.get_future().wait_for(500ms);
58 }
59 
60 /* --------------------------------------------------------
61  open
62  -------------------------------------------------------- */
63 bool CNTRIPClient::open(const NTRIPArgs& params, string& out_errmsg)
64 {
65  this->close();
66 
67  if (params.mountpoint.empty())
68  {
69  out_errmsg = "MOUNTPOINT cannot be empty.";
70  return false;
71  }
72  if (params.server.empty())
73  {
74  out_errmsg = "Server address cannot be empty.";
75  return false;
76  }
77 
78  // Try to open it:
81  out_errmsg.clear();
82 
83  m_args = params;
84  m_thread_do_process = true;
85 
86  // Wait until the thread tell us the initial result...
87  if (m_sem_first_connect_done.get_future().wait_for(6s) ==
88  std::future_status::timeout)
89  {
90  out_errmsg = "Timeout waiting thread response";
91  return false;
92  }
93 
94  switch (m_answer_connection)
95  {
96  case connOk:
97  return true;
98  case connError:
99  out_errmsg = format(
100  "Error trying to connect to server '%s'",
101  params.server.c_str());
102  return false;
103  case connUnauthorized:
104  out_errmsg = format(
105  "Authentication failed for server '%s'", params.server.c_str());
106  return false;
107 
108  default:
109  out_errmsg = "UNKNOWN m_answer_connection!!";
110  return false;
111  }
112 }
113 
114 /* --------------------------------------------------------
115  THE WORKING THREAD
116  -------------------------------------------------------- */
118 {
119  try
120  {
121  CClientTCPSocket my_sock;
122 
123  bool last_thread_do_process = m_thread_do_process;
124 
125  while (!m_thread_exit)
126  {
127  if (!m_thread_do_process)
128  {
129  if (my_sock.isConnected())
130  {
131  // Close connection:
132  try
133  {
134  my_sock.close();
135  }
136  catch (...)
137  {
138  }
139  }
140  else
141  {
142  // Nothing to be done... just wait
143  }
144 
145  if (last_thread_do_process) // Let the waiting caller continue
146  // now.
147  m_sem_sock_closed.set_value();
148 
149  last_thread_do_process = m_thread_do_process;
150  std::this_thread::sleep_for(100ms);
151  continue;
152  }
153 
154  last_thread_do_process = m_thread_do_process;
155 
156  // We have a mission to do here... is the channel already open??
157 
158  if (!my_sock.isConnected())
159  {
160  TConnResult connect_res = connError;
161 
162  std::vector<uint8_t> buf;
163  try
164  {
165  // Nope, it's the first time: get params and try open the
166  // connection:
167  stream_data.clear();
168 
169  cout << format(
170  "[CNTRIPClient] Trying to connect to %s:%i\n",
171  m_args.server.c_str(), m_args.port);
172 
173  my_sock.connect(m_args.server, m_args.port);
174  if (m_thread_exit) break;
175 
176  // Prepare HTTP request:
177  // -------------------------------------------
178  string req = format(
179  "GET /%s HTTP/1.0\r\n", m_args.mountpoint.c_str());
180 
181  if (isalpha(m_args.server[0]))
182  req += format("Host: %s\r\n", m_args.server.c_str());
183 
184  req += "User-Agent: NTRIP MRPT Library\r\n";
185  req += "Accept: */*\r\n";
186  req += "Connection: close\r\n";
187 
188  // Implement HTTP Basic authentication:
189  // See:
190  // http://en.wikipedia.org/wiki/Basic_access_authentication
191  if (!m_args.user.empty())
192  {
193  string auth_str =
194  m_args.user + string(":") + m_args.password;
195  std::vector<uint8_t> v(auth_str.size());
196  ::memcpy(&v[0], &auth_str[0], auth_str.size());
197 
198  string encoded_str;
199  mrpt::system::encodeBase64(v, encoded_str);
200 
201  req += "Authorization: Basic ";
202  req += encoded_str;
203  req += "\r\n";
204  }
205 
206  // End:
207  req += "\r\n";
208  // cout << req;
209 
210  // Send:
211  my_sock.sendString(req);
212 
213  // Try to read the header of the response:
214  size_t to_read_now = 30;
215  buf.resize(to_read_now);
216  size_t len =
217  my_sock.readAsync(&buf[0], to_read_now, 4000, 200);
218 
219  buf.resize(len);
220 
221  if ((len != 0) && my_sock.isConnected())
222  connect_res = connOk;
223  }
224  catch (std::exception&)
225  {
226  // cout << e.what() << endl;
227  connect_res = connError;
228  }
229 
230  // We are not disconnected yet, it's a good thing... anyway,
231  // check the answer code:
232  if (!buf.empty())
233  {
234  string resp;
235  resp.resize(buf.size());
236  ::memcpy(&resp[0], &buf[0], buf.size());
237 
238  if (resp.find(" 200 ") == string::npos)
239  {
240  // It's NOT a good response...
241  connect_res = connError;
242 
243  // 401?
244  if (resp.find(" 401 ") != string::npos)
245  connect_res = connUnauthorized;
246  }
247  }
248 
249  // Signal my caller that the connection is established:
250  // ---------------------------------------------------------------
252  {
254 
255  m_answer_connection = connect_res;
256  m_sem_first_connect_done.set_value();
257  }
258 
259  if (connect_res != connOk) my_sock.close();
260  }
261 
262  // Retry if it was a failed connection.
263  if (!my_sock.isConnected())
264  {
265  std::this_thread::sleep_for(500ms);
266  continue;
267  }
268 
269  // Read data from the stream and accumulate it in a buffer:
270  // ----------------------------------------------------------------------
271  std::vector<uint8_t> buf;
272  size_t to_read_now = 1000;
273  buf.resize(to_read_now);
274  size_t len = my_sock.readAsync(&buf[0], to_read_now, 10, 5);
275 
276  buf.resize(len);
277 
278  if (my_sock.isConnected())
279  {
280  // Send data to main buffer:
281  if (stream_data.size() > 1024 * 8)
282  stream_data.clear(); // It seems nobody's reading it...
283 
284  stream_data.appendData(buf);
285  buf.clear();
286  }
287 
288  // Send back data to the server, if so requested:
289  // ------------------------------------------
290  std::vector<uint8_t> upload_data;
291  m_upload_data.readAndClear(upload_data);
292  if (!upload_data.empty())
293  {
294  const size_t N = upload_data.size();
295  const size_t nWritten =
296  my_sock.writeAsync(&upload_data[0], N, 1000);
297  if (nWritten != N)
298  cerr << "*ERROR*: Couldn't write back " << N
299  << " bytes to NTRIP server!.\n";
300  }
301 
302  std::this_thread::sleep_for(10ms);
303  } // end while
304 
305  } // end try
306  catch (exception& e)
307  {
308  cerr << "[CNTRIPClient] Exception in working thread: " << endl
309  << e.what() << endl;
310  }
311  catch (...)
312  {
313  cerr << "[CNTRIPClient] Runtime exception in working thread." << endl;
314  }
315 
316 } // end working thread
317 
318 /* --------------------------------------------------------
319  retrieveListOfMountpoints
320  -------------------------------------------------------- */
322  TListMountPoints& out_list, string& out_errmsg, const string& server,
323  int port, const string& auth_user, const string& auth_pass)
324 {
325  string content;
326  int http_code;
327  TParameters<string> my_headers;
328 
329  out_list.clear();
330 
332  string("http://") + server, content, out_errmsg, port, auth_user,
333  auth_pass, &http_code, &my_headers, nullptr, 6000);
334 
335  // Parse contents:
336  if (ret != net::erOk) return false;
337 
338  std::stringstream ss(content);
339  string lin;
340  while (std::getline(ss, lin, '\n'))
341  {
342  if (lin.size() < 5) continue;
343  if (0 != ::strncmp("STR;", lin.c_str(), 4)) continue;
344 
345  // ok, it's a stream:
346  deque<string> fields;
347  mrpt::system::tokenize(lin, ";", fields);
348 
349  if (fields.size() < 13) continue;
350 
351  TMountPoint mnt;
352 
353  mnt.mountpoint_name = fields[1];
354  mnt.id = fields[2];
355  mnt.format = fields[3];
356  mnt.format_details = fields[4];
357  mnt.carrier = atoi(fields[5].c_str());
358  mnt.nav_system = fields[6];
359  mnt.network = fields[7];
360  mnt.country_code = fields[8];
361  mnt.latitude = atof(fields[9].c_str());
362  mnt.longitude = atof(fields[10].c_str());
363 
364  // Longitude in range: -180,180
366 
367  mnt.needs_nmea = atoi(fields[11].c_str()) != 0;
368  mnt.net_ref_stations = atoi(fields[12].c_str()) != 0;
369 
370  if (fields.size() >= 19) mnt.extra_info = fields[18];
371 
372  out_list.push_back(mnt);
373  }
374 
375  return true;
376 }
377 
378 /** Enqueues a string to be sent back to the NTRIP server (e.g. GGA frames) */
380 {
381  if (data.empty()) return;
382 
383  std::vector<uint8_t> d(data.size());
384  ::memcpy(&d[0], &data[0], data.size());
386 }
std::promise< void > m_sem_sock_closed
Definition: CNTRIPClient.h:96
bool open(const NTRIPArgs &params, std::string &out_errmsg)
Tries to open a given NTRIP stream and, if successful, launches a thread for continuously reading fro...
double RAD2DEG(const double x)
Radians to degrees.
void appendData(const std::vector< uint8_t > &d)
Append new data to the stream.
Definition: MT_buffer.h:52
void connect(const std::string &remotePartAddress, unsigned short remotePartTCPPort, unsigned int timeout_ms=0)
Establishes a connection with a remote part.
bool isConnected()
Returns true if this objects represents a successfully connected socket.
double DEG2RAD(const double x)
Degrees to radians.
void private_ntrip_thread()
The working thread.
static bool retrieveListOfMountpoints(TListMountPoints &out_list, std::string &out_errmsg, const std::string &server, int port=2101, const std::string &auth_user=std::string(), const std::string &auth_pass=std::string())
Connect to a given NTRIP caster and get the list of all available mountpoints and their parameters...
Contains classes for various device interfaces.
STL namespace.
void close()
Closes the connection.
GLdouble s
Definition: glext.h:3682
GLenum GLsizei len
Definition: glext.h:4756
A descriptor of one stream in an NTRIP Caster - See CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:42
ERRORCODE_HTTP http_get(const string &url, std::vector< uint8_t > &out_content, string &out_errormsg, int port=80, const string &auth_user=string(), const string &auth_pass=string(), int *out_http_responsecode=nullptr, mrpt::system::TParameters< string > *extra_headers=nullptr, mrpt::system::TParameters< string > *out_headers=nullptr, int timeout_ms=1000)
Perform an HTTP GET operation (version for retrieving the data as a std::vector<uint8_t>) ...
Definition: net_utils.cpp:385
virtual ~CNTRIPClient()
Default destructor.
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
This base provides a set of functions for maths stuff.
std::string format
RTCM 2.3, RTCM 3, CMR+, etc...
Definition: CNTRIPClient.h:48
ERRORCODE_HTTP
Possible returns from a HTTP request.
Definition: net_utils.h:31
NTRIPArgs m_args
All the parameters for the NTRIP connection.
Definition: CNTRIPClient.h:113
std::string country_code
ITA, ESP, DEU,...
Definition: CNTRIPClient.h:57
int carrier
0: No carrier phase, 1: L1, 2: L1+L2
Definition: CNTRIPClient.h:51
bool m_thread_do_process
Will be "true" between "open" and "close".
Definition: CNTRIPClient.h:101
GLsizei const GLchar ** string
Definition: glext.h:4116
T wrapToPi(T a)
Modifies the given angle to translate it into the ]-pi,pi] range.
Definition: wrap2pi.h:50
size_t writeAsync(const void *Buffer, const size_t Count, const int timeout_ms=-1)
A method for writing to the socket with optional timeouts.
mrpt::containers::MT_buffer m_upload_data
Buffer for data to be sent back to the server.
Definition: CNTRIPClient.h:116
const GLdouble * v
Definition: glext.h:3684
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::promise< void > m_sem_first_connect_done
Definition: CNTRIPClient.h:97
void encodeBase64(const std::vector< uint8_t > &inputData, std::string &outString)
Encode a sequence of bytes as a string in base-64.
Definition: base64.cpp:29
std::string format(const char *fmt,...) MRPT_printf_format_check(1
A std::string version of C sprintf.
Definition: format.cpp:16
A TCP socket that can be connected to a TCP server, implementing MRPT&#39;s CStream interface for passing...
void close()
Closes the connection.
void readAndClear(std::vector< uint8_t > &d)
Read the whole buffer and empty it.
Definition: MT_buffer.h:60
void clear()
Empty the buffer.
Definition: MT_buffer.h:34
Serial and networking devices and utilities.
size_t readAsync(void *Buffer, const size_t Count, const int timeoutStart_ms=-1, const int timeoutBetween_ms=-1)
A method for reading from the socket with an optional timeout.
std::list< TMountPoint > TListMountPoints
Used in CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:74
For usage when passing a dynamic number of (numeric) arguments to a function, by name.
Definition: TParameters.h:54
GLsizei GLsizei GLenum GLenum const GLvoid * data
Definition: glext.h:3550
GLenum const GLfloat * params
Definition: glext.h:3538
The arguments for connecting to a NTRIP stream, used in CNTRIPClient::open.
Definition: CNTRIPClient.h:79
mrpt::containers::MT_buffer stream_data
The buffer with all the bytes so-far read from the NTRIP server stream.
Definition: CNTRIPClient.h:143
void sendString(const std::string &str)
Writes a string to the socket.
void memcpy(void *dest, size_t destSize, const void *src, size_t copyCount) noexcept
An OS and compiler independent version of "memcpy".
Definition: os.cpp:358
size_t size()
Return the number of available bytes at this moment.
Definition: MT_buffer.h:42
void sendBackToServer(const std::string &data)
Enqueues a string to be sent back to the NTRIP server (e.g.



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: 8fe78517f Sun Jul 14 19:43:28 2019 +0200 at lun oct 28 02:10:00 CET 2019