MRPT  1.9.9
CNTRIPClient.cpp
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2019, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 
10 #include "hwdrivers-precomp.h" // Precompiled headers
11 
13 #include <mrpt/comms/net_utils.h>
14 #include <mrpt/core/bits_math.h>
15 #include <mrpt/core/format.h>
17 #include <mrpt/math/wrap2pi.h>
19 #include <cstring>
20 #include <iostream>
21 
22 using namespace mrpt;
23 using namespace mrpt::comms;
24 using namespace mrpt::system;
25 using namespace mrpt::hwdrivers;
26 using namespace mrpt::math;
27 using namespace std;
28 
29 /* --------------------------------------------------------
30  CNTRIPClient
31  -------------------------------------------------------- */
32 CNTRIPClient::CNTRIPClient() : m_thread(), m_args()
33 {
34  m_thread = std::thread(&CNTRIPClient::private_ntrip_thread, this);
35 }
36 
37 /* --------------------------------------------------------
38  ~CNTRIPClient
39  -------------------------------------------------------- */
41 {
42  this->close();
43  if (m_thread.joinable())
44  {
45  m_thread_exit = true;
46  m_thread.join();
47  }
48 }
49 
50 /* --------------------------------------------------------
51  close
52  -------------------------------------------------------- */
54 {
56  if (!m_thread_do_process) return;
57  m_thread_do_process = false;
58  m_sem_sock_closed.get_future().wait_for(500ms);
59 }
60 
61 /* --------------------------------------------------------
62  open
63  -------------------------------------------------------- */
64 bool CNTRIPClient::open(const NTRIPArgs& params, string& out_errmsg)
65 {
66  this->close();
67 
68  if (params.mountpoint.empty())
69  {
70  out_errmsg = "MOUNTPOINT cannot be empty.";
71  return false;
72  }
73  if (params.server.empty())
74  {
75  out_errmsg = "Server address cannot be empty.";
76  return false;
77  }
78 
79  // Try to open it:
82  out_errmsg.clear();
83 
84  m_args = params;
85  m_thread_do_process = true;
86 
87  // Wait until the thread tell us the initial result...
88  if (m_sem_first_connect_done.get_future().wait_for(6s) ==
89  std::future_status::timeout)
90  {
91  out_errmsg = "Timeout waiting thread response";
92  return false;
93  }
94 
95  switch (m_answer_connection)
96  {
97  case connOk:
98  return true;
99  case connError:
100  out_errmsg = format(
101  "Error trying to connect to server '%s'",
102  params.server.c_str());
103  return false;
104  case connUnauthorized:
105  out_errmsg = format(
106  "Authentication failed for server '%s'", params.server.c_str());
107  return false;
108 
109  default:
110  out_errmsg = "UNKNOWN m_answer_connection!!";
111  return false;
112  }
113 }
114 
115 /* --------------------------------------------------------
116  THE WORKING THREAD
117  -------------------------------------------------------- */
119 {
120  try
121  {
122  CClientTCPSocket my_sock;
123 
124  bool last_thread_do_process = m_thread_do_process;
125 
126  while (!m_thread_exit)
127  {
128  if (!m_thread_do_process)
129  {
130  if (my_sock.isConnected())
131  {
132  // Close connection:
133  try
134  {
135  my_sock.close();
136  }
137  catch (...)
138  {
139  }
140  }
141  else
142  {
143  // Nothing to be done... just wait
144  }
145 
146  if (last_thread_do_process) // Let the waiting caller continue
147  // now.
148  m_sem_sock_closed.set_value();
149 
150  last_thread_do_process = m_thread_do_process;
151  std::this_thread::sleep_for(100ms);
152  continue;
153  }
154 
155  last_thread_do_process = m_thread_do_process;
156 
157  // We have a mission to do here... is the channel already open??
158 
159  if (!my_sock.isConnected())
160  {
161  TConnResult connect_res = connError;
162 
163  std::vector<uint8_t> buf;
164  try
165  {
166  // Nope, it's the first time: get params and try open the
167  // connection:
168  stream_data.clear();
169 
170  cout << format(
171  "[CNTRIPClient] Trying to connect to %s:%i\n",
172  m_args.server.c_str(), m_args.port);
173 
174  my_sock.connect(m_args.server, m_args.port);
175  if (m_thread_exit) break;
176 
177  // Prepare HTTP request:
178  // -------------------------------------------
179  string req = format(
180  "GET /%s HTTP/1.0\r\n", m_args.mountpoint.c_str());
181 
182  if (isalpha(m_args.server[0]))
183  req += format("Host: %s\r\n", m_args.server.c_str());
184 
185  req += "User-Agent: NTRIP MRPT Library\r\n";
186  req += "Accept: */*\r\n";
187  req += "Connection: close\r\n";
188 
189  // Implement HTTP Basic authentication:
190  // See:
191  // http://en.wikipedia.org/wiki/Basic_access_authentication
192  if (!m_args.user.empty())
193  {
194  string auth_str =
195  m_args.user + string(":") + m_args.password;
196  std::vector<uint8_t> v(auth_str.size());
197  std::memcpy(&v[0], &auth_str[0], auth_str.size());
198 
199  string encoded_str;
200  mrpt::system::encodeBase64(v, encoded_str);
201 
202  req += "Authorization: Basic ";
203  req += encoded_str;
204  req += "\r\n";
205  }
206 
207  // End:
208  req += "\r\n";
209  // cout << req;
210 
211  // Send:
212  my_sock.sendString(req);
213 
214  // Try to read the header of the response:
215  size_t to_read_now = 30;
216  buf.resize(to_read_now);
217  size_t len =
218  my_sock.readAsync(&buf[0], to_read_now, 4000, 200);
219 
220  buf.resize(len);
221 
222  if ((len != 0) && my_sock.isConnected())
223  connect_res = connOk;
224  }
225  catch (std::exception&)
226  {
227  // cout << e.what() << endl;
228  connect_res = connError;
229  }
230 
231  // We are not disconnected yet, it's a good thing... anyway,
232  // check the answer code:
233  if (!buf.empty())
234  {
235  string resp;
236  resp.resize(buf.size());
237  std::memcpy(&resp[0], &buf[0], buf.size());
238 
239  if (resp.find(" 200 ") == string::npos)
240  {
241  // It's NOT a good response...
242  connect_res = connError;
243 
244  // 401?
245  if (resp.find(" 401 ") != string::npos)
246  connect_res = connUnauthorized;
247  }
248  }
249 
250  // Signal my caller that the connection is established:
251  // ---------------------------------------------------------------
253  {
255 
256  m_answer_connection = connect_res;
257  m_sem_first_connect_done.set_value();
258  }
259 
260  if (connect_res != connOk) my_sock.close();
261  }
262 
263  // Retry if it was a failed connection.
264  if (!my_sock.isConnected())
265  {
266  std::this_thread::sleep_for(500ms);
267  continue;
268  }
269 
270  // Read data from the stream and accumulate it in a buffer:
271  // ----------------------------------------------------------------------
272  std::vector<uint8_t> buf;
273  size_t to_read_now = 1000;
274  buf.resize(to_read_now);
275  size_t len = my_sock.readAsync(&buf[0], to_read_now, 10, 5);
276 
277  buf.resize(len);
278 
279  if (my_sock.isConnected())
280  {
281  // Send data to main buffer:
282  if (stream_data.size() > 1024 * 8)
283  stream_data.clear(); // It seems nobody's reading it...
284 
285  stream_data.appendData(buf);
286  buf.clear();
287  }
288 
289  // Send back data to the server, if so requested:
290  // ------------------------------------------
291  std::vector<uint8_t> upload_data;
292  m_upload_data.readAndClear(upload_data);
293  if (!upload_data.empty())
294  {
295  const size_t N = upload_data.size();
296  const size_t nWritten =
297  my_sock.writeAsync(&upload_data[0], N, 1000);
298  if (nWritten != N)
299  cerr << "*ERROR*: Couldn't write back " << N
300  << " bytes to NTRIP server!.\n";
301  }
302 
303  std::this_thread::sleep_for(10ms);
304  } // end while
305 
306  } // end try
307  catch (exception& e)
308  {
309  cerr << "[CNTRIPClient] Exception in working thread: " << endl
310  << e.what() << endl;
311  }
312  catch (...)
313  {
314  cerr << "[CNTRIPClient] Runtime exception in working thread." << endl;
315  }
316 
317 } // end working thread
318 
319 /* --------------------------------------------------------
320  retrieveListOfMountpoints
321  -------------------------------------------------------- */
323  TListMountPoints& out_list, string& out_errmsg, const string& server,
324  int port, const string& auth_user, const string& auth_pass)
325 {
326  string content;
327  int http_code;
328  TParameters<string> my_headers;
329 
330  out_list.clear();
331 
333  string("http://") + server, content, out_errmsg, port, auth_user,
334  auth_pass, &http_code, &my_headers, nullptr, 6000);
335 
336  // Parse contents:
337  if (ret != net::erOk) return false;
338 
339  std::stringstream ss(content);
340  string lin;
341  while (std::getline(ss, lin, '\n'))
342  {
343  if (lin.size() < 5) continue;
344  if (0 != ::strncmp("STR;", lin.c_str(), 4)) continue;
345 
346  // ok, it's a stream:
347  deque<string> fields;
348  mrpt::system::tokenize(lin, ";", fields);
349 
350  if (fields.size() < 13) continue;
351 
352  TMountPoint mnt;
353 
354  mnt.mountpoint_name = fields[1];
355  mnt.id = fields[2];
356  mnt.format = fields[3];
357  mnt.format_details = fields[4];
358  mnt.carrier = atoi(fields[5].c_str());
359  mnt.nav_system = fields[6];
360  mnt.network = fields[7];
361  mnt.country_code = fields[8];
362  mnt.latitude = atof(fields[9].c_str());
363  mnt.longitude = atof(fields[10].c_str());
364 
365  // Longitude in range: -180,180
367 
368  mnt.needs_nmea = atoi(fields[11].c_str()) != 0;
369  mnt.net_ref_stations = atoi(fields[12].c_str()) != 0;
370 
371  if (fields.size() >= 19) mnt.extra_info = fields[18];
372 
373  out_list.push_back(mnt);
374  }
375 
376  return true;
377 }
378 
379 /** Enqueues a string to be sent back to the NTRIP server (e.g. GGA frames) */
381 {
382  if (data.empty()) return;
383 
384  std::vector<uint8_t> d(data.size());
385  std::memcpy(&d[0], &data[0], data.size());
387 }
std::promise< void > m_sem_sock_closed
Definition: CNTRIPClient.h:96
bool open(const NTRIPArgs &params, std::string &out_errmsg)
Tries to open a given NTRIP stream and, if successful, launches a thread for continuously reading fro...
double RAD2DEG(const double x)
Radians to degrees.
void appendData(const std::vector< uint8_t > &d)
Append new data to the stream.
Definition: MT_buffer.h:52
void connect(const std::string &remotePartAddress, unsigned short remotePartTCPPort, unsigned int timeout_ms=0)
Establishes a connection with a remote part.
std::string std::string format(std::string_view fmt, ARGS &&... args)
Definition: format.h:26
bool isConnected()
Returns true if this objects represents a successfully connected socket.
double DEG2RAD(const double x)
Degrees to radians.
void private_ntrip_thread()
The working thread.
static bool retrieveListOfMountpoints(TListMountPoints &out_list, std::string &out_errmsg, const std::string &server, int port=2101, const std::string &auth_user=std::string(), const std::string &auth_pass=std::string())
Connect to a given NTRIP caster and get the list of all available mountpoints and their parameters...
Contains classes for various device interfaces.
STL namespace.
void close()
Closes the connection.
GLdouble s
Definition: glext.h:3682
GLenum GLsizei len
Definition: glext.h:4756
A descriptor of one stream in an NTRIP Caster - See CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:42
ERRORCODE_HTTP http_get(const string &url, std::vector< uint8_t > &out_content, string &out_errormsg, int port=80, const string &auth_user=string(), const string &auth_pass=string(), int *out_http_responsecode=nullptr, mrpt::system::TParameters< string > *extra_headers=nullptr, mrpt::system::TParameters< string > *out_headers=nullptr, int timeout_ms=1000)
Perform an HTTP GET operation (version for retrieving the data as a std::vector<uint8_t>) ...
Definition: net_utils.cpp:386
virtual ~CNTRIPClient()
Default destructor.
void tokenize(const std::string &inString, const std::string &inDelimiters, OUT_CONTAINER &outTokens, bool skipBlankTokens=true) noexcept
Tokenizes a string according to a set of delimiting characters.
This base provides a set of functions for maths stuff.
std::string format
RTCM 2.3, RTCM 3, CMR+, etc...
Definition: CNTRIPClient.h:48
ERRORCODE_HTTP
Possible returns from a HTTP request.
Definition: net_utils.h:31
NTRIPArgs m_args
All the parameters for the NTRIP connection.
Definition: CNTRIPClient.h:113
std::string country_code
ITA, ESP, DEU,...
Definition: CNTRIPClient.h:57
int carrier
0: No carrier phase, 1: L1, 2: L1+L2
Definition: CNTRIPClient.h:51
bool m_thread_do_process
Will be "true" between "open" and "close".
Definition: CNTRIPClient.h:101
GLsizei const GLchar ** string
Definition: glext.h:4116
T wrapToPi(T a)
Modifies the given angle to translate it into the ]-pi,pi] range.
Definition: wrap2pi.h:50
size_t writeAsync(const void *Buffer, const size_t Count, const int timeout_ms=-1)
A method for writing to the socket with optional timeouts.
mrpt::containers::MT_buffer m_upload_data
Buffer for data to be sent back to the server.
Definition: CNTRIPClient.h:116
const GLdouble * v
Definition: glext.h:3684
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
std::promise< void > m_sem_first_connect_done
Definition: CNTRIPClient.h:97
void encodeBase64(const std::vector< uint8_t > &inputData, std::string &outString)
Encode a sequence of bytes as a string in base-64.
Definition: base64.cpp:29
A TCP socket that can be connected to a TCP server, implementing MRPT&#39;s CStream interface for passing...
void close()
Closes the connection.
void readAndClear(std::vector< uint8_t > &d)
Read the whole buffer and empty it.
Definition: MT_buffer.h:60
void clear()
Empty the buffer.
Definition: MT_buffer.h:34
Serial and networking devices and utilities.
size_t readAsync(void *Buffer, const size_t Count, const int timeoutStart_ms=-1, const int timeoutBetween_ms=-1)
A method for reading from the socket with an optional timeout.
std::list< TMountPoint > TListMountPoints
Used in CNTRIPClient::retrieveListOfMountpoints.
Definition: CNTRIPClient.h:74
For usage when passing a dynamic number of (numeric) arguments to a function, by name.
Definition: TParameters.h:54
GLsizei GLsizei GLenum GLenum const GLvoid * data
Definition: glext.h:3550
GLenum const GLfloat * params
Definition: glext.h:3538
The arguments for connecting to a NTRIP stream, used in CNTRIPClient::open.
Definition: CNTRIPClient.h:79
mrpt::containers::MT_buffer stream_data
The buffer with all the bytes so-far read from the NTRIP server stream.
Definition: CNTRIPClient.h:143
void sendString(const std::string &str)
Writes a string to the socket.
void memcpy(void *dest, size_t destSize, const void *src, size_t copyCount) noexcept
An OS and compiler independent version of "memcpy".
Definition: os.cpp:358
size_t size()
Return the number of available bytes at this moment.
Definition: MT_buffer.h:42
void sendBackToServer(const std::string &data)
Enqueues a string to be sent back to the NTRIP server (e.g.



Page generated by Doxygen 1.8.14 for MRPT 1.9.9 Git: abb8b1a1e Fri Oct 18 14:19:12 2019 +0200 at vie oct 18 14:20:13 CEST 2019