MRPT  2.0.5
zmq_serialization.h
Go to the documentation of this file.
1 /* +------------------------------------------------------------------------+
2  | Mobile Robot Programming Toolkit (MRPT) |
3  | https://www.mrpt.org/ |
4  | |
5  | Copyright (c) 2005-2020, Individual contributors, see AUTHORS file |
6  | See: https://www.mrpt.org/Authors - All rights reserved. |
7  | Released under BSD License. See: https://www.mrpt.org/License |
8  +------------------------------------------------------------------------+ */
9 #pragma once
10 
11 #include <mrpt/io/CMemoryStream.h>
13 #include <cmath> // ceil()
14 
15 namespace mrpt
16 {
17 namespace serialization
18 {
19 // clang-format off
20 /** \addtogroup noncstream_serialization_zmq Serialization functions for ZMQ (v3 or above) (in #include <mrpt/serialization/serialization_zmq.h>) \ingroup
21  * noncstream_serialization
22  * @{ */
23 //clang-format on
24 
25 /** Send an MRPT object to a ZMQ socket.
26  * \param[in] obj The object to be serialized and sent to the socket.
27  * \param[in] zmq_socket The zmq socket object.
28  * \param[in] max_packet_len The object will be split into a series of ZMQ
29  * "message parts" of this maximum length (in bytes). Default=0, which means do
30  * not split in parts.
31  * \note Including `<mrpt/serialization/serialization_zmq.h>` requires libzmq to
32  * be
33  * available in your system and linked
34  * to your user code. This function can be used even if MRPT was built without
35  * ZMQ support, thanks to the use of templates.
36  * \exception std::exception If the object finds any critical error during
37  * serialization or on ZMQ errors.
38  * \note See examples of usage in
39  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
40  */
41 template <typename ZMQ_SOCKET_TYPE>
43  ZMQ_SOCKET_TYPE zmq_socket, const mrpt::serialization::CSerializable& obj,
44  const size_t max_packet_len = 0)
45 {
47  if (!buf) throw std::bad_alloc();
48 
49  buf->WriteObject(&obj);
50  const size_t nBytes = buf->getTotalBytesCount();
51  if (!nBytes)
52  throw std::runtime_error(
53  "[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably "
54  "means something went wrong...");
55  unsigned int nPkts =
56  (!max_packet_len)
57  ? 1U
58  : static_cast<unsigned int>(ceil(double(nBytes) / max_packet_len));
59  for (unsigned int iPkt = 0; iPkt < nPkts; ++iPkt)
60  {
61  // Prepare a msg part:
62  mrpt::serialization::internal::TFreeFnDataForZMQ* fd =
63  new mrpt::serialization::internal::TFreeFnDataForZMQ();
64  if (!fd) throw std::bad_alloc();
65  fd->buf = buf;
66  fd->do_free =
67  iPkt ==
68  (nPkts - 1); // Free buffer only after the last part is disposed.
69  void* pkt_data = reinterpret_cast<char*>(fd->buf->getRawBufferData()) +
70  max_packet_len * iPkt;
71  size_t nBytesThisPkt = nBytes - max_packet_len * iPkt;
72  if (max_packet_len != 0 && nBytesThisPkt > max_packet_len)
73  nBytesThisPkt = max_packet_len;
74  // Build ZMQ msg:
75  zmq_msg_t message;
76  if (0 != zmq_msg_init_data(
77  &message, pkt_data, nBytesThisPkt,
79  throw std::runtime_error(
80  "[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
81  // Send:
82  const int sent_size =
83  zmq_msg_send(&message, zmq_socket, fd->do_free ? 0 : ZMQ_SNDMORE);
84  if (0 != zmq_msg_close(&message))
85  throw std::runtime_error(
86  "[mrpt_send_to_zmq] Error in zmq_msg_close()");
87  if (sent_size != static_cast<int>(nBytesThisPkt))
88  throw std::runtime_error(
89  "[mrpt_send_to_zmq] Error in zmq_msg_send()");
90  }
91 }
92 
93 /** Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().
94  * This function just stores the received data into a memory buffer without
95  * parsing it into an MRPT object.
96  * \return false on any error */
97 template <typename ZMQ_SOCKET_TYPE, typename VECTOR_MSG_T>
99  ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T& out_lst_msgs,
100  mrpt::io::CMemoryStream& target_buf, bool dont_wait,
101  size_t* rx_obj_length_in_bytes)
102 {
103  if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
104  out_lst_msgs.clear();
105  target_buf.clear();
106  int64_t more;
107  size_t more_size = sizeof(more);
108  do
109  {
110  // Init rx msg:
111  zmq_msg_t* msg = new zmq_msg_t();
112  if (0 != zmq_msg_init(msg)) return false;
113  out_lst_msgs.push_back(msg);
114  // Recv:
115  int rc = zmq_msg_recv(msg, zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
116  if (rc == -1) return false;
117  // Determine if more message parts are to follow
118  rc = zmq_getsockopt(zmq_socket, ZMQ_RCVMORE, &more, &more_size);
119  if (rc != 0) return false;
120  // Only one part?
121  if (out_lst_msgs.size() == 1 && !more)
122  {
123  target_buf.assignMemoryNotOwn(zmq_msg_data(msg), zmq_msg_size(msg));
124  if (rx_obj_length_in_bytes)
125  *rx_obj_length_in_bytes = zmq_msg_size(msg);
126  }
127  } while (more);
128  // More than 1 part?
129  if (out_lst_msgs.size() > 1)
130  {
131  for (size_t i = 0; i < out_lst_msgs.size(); i++)
132  {
133  target_buf.WriteBuffer(
134  zmq_msg_data(out_lst_msgs[i]), zmq_msg_size(out_lst_msgs[i]));
135  }
136  if (rx_obj_length_in_bytes)
137  *rx_obj_length_in_bytes = target_buf.getTotalBytesCount();
138  target_buf.Seek(0);
139  }
140  return true;
141 }
142 
143 namespace internal
144 {
145 template <typename VECTOR_MSG_T>
146 void free_zmq_msg_lst(VECTOR_MSG_T& lst_msgs)
147 {
148  for (size_t i = 0; i < lst_msgs.size(); ++i)
149  {
150  zmq_msg_close(lst_msgs[i]);
151  delete lst_msgs[i];
152  }
153 }
154 } // namespace internal
155 
156 /** Receives an MRPT object from a ZMQ socket, determining the type of the
157  * object on-the-fly.
158  * \param[in] zmq_socket The zmq socket object.
159  * \param[in] dont_wait If true, will fail if there is no data ready to
160  * be read. If false (default) this function will block until data arrives.
161  * \param[out] rx_obj_length_in_bytes If non-nullptr, the object length will be
162  * stored here.
163  * \return An empty smart pointer if there was any error. The received
164  * object if all went OK.
165  * \note Including `<mrpt/serialization/serialization_zmq.h>` requires libzmq to
166  * be
167  * available in your system and linked to your user code. This function
168  * can be used even if MRPT was built without ZMQ support, thanks to the
169  * use of templates.
170  * \exception std::exception If the object finds any critical error during
171  * de-serialization.
172  * \sa mrpt_recv_from_zmq_into
173  * \note See examples of usage in
174  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
175  */
176 template <typename ZMQ_SOCKET_TYPE>
178  ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait = false,
179  size_t* rx_obj_length_in_bytes = nullptr)
180 {
181  CMemoryStream target_buf;
183  std::vector<zmq_msg_t*> lst_msgs_to_close;
185  zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
186  rx_obj_length_in_bytes))
187  return obj;
188  // De-serialize:
189  obj = target_buf.ReadObject();
190  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
191  return obj;
192 }
193 /** Like mrpt_recv_from_zmq() but without dynamically allocating the received
194  * object,
195  * more efficient to use if the type of the received object is known in
196  * advance.
197  * \param[in] target_object The received object will be stored here. An
198  * exception will be raised upon type mismatch.
199  * \return true if all was OK, false on any ZMQ error.
200  * \sa mrpt_recv_from_zmq() for details on the rest of parameters.
201  * \note See examples of usage in
202  * https://github.com/MRPT/mrpt/tree/master/doc/mrpt-zeromq-example
203  */
204 template <typename ZMQ_SOCKET_TYPE>
206  ZMQ_SOCKET_TYPE zmq_socket,
207  mrpt::serialization::CSerializable& target_object, bool dont_wait = false,
208  size_t* rx_obj_length_in_bytes = nullptr)
209 {
210  CMemoryStream target_buf;
211  std::vector<zmq_msg_t*> lst_msgs_to_close;
213  zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
214  rx_obj_length_in_bytes))
215  return false;
216  // De-serialize:
217  target_buf.ReadObject(&target_object);
218  internal::free_zmq_msg_lst(lst_msgs_to_close); // Free msgs mem
219  return true;
220 }
221 
222 /** @} */
223 } // namespace serialization
224 } // namespace mrpt
void free_fn_for_zmq(void *data, void *hint)
Used in mrpt_send_to_zmq().
mrpt::serialization::CSerializable::Ptr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Receives an MRPT object from a ZMQ socket, determining the type of the object on-the-fly.
mrpt::io::CMemoryStream CMemoryStream
void mrpt_send_to_zmq(ZMQ_SOCKET_TYPE zmq_socket, const mrpt::serialization::CSerializable &obj, const size_t max_packet_len=0)
Send an MRPT object to a ZMQ socket.
uint64_t getTotalBytesCount() const override
Returns the total size of the internal buffer.
This CStream derived class allow using a memory buffer as a CStream.
void assignMemoryNotOwn(const void *data, const uint64_t nBytesInData)
Initilize the data in the stream from a block of memory which is NEITHER OWNED NOR COPIED by the obje...
uint64_t Seek(int64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) override
Introduces a pure virtual method for moving to a specified position in the streamed resource...
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
The virtual base class which provides a unified interface for all persistent objects in MRPT...
Definition: CSerializable.h:30
void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
bool mrpt_recv_from_zmq_buf(ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T &out_lst_msgs, mrpt::io::CMemoryStream &target_buf, bool dont_wait, size_t *rx_obj_length_in_bytes)
Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().
void clear()
Clears the memory buffer.
bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::serialization::CSerializable &target_object, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Like mrpt_recv_from_zmq() but without dynamically allocating the received object, more efficient to u...



Page generated by Doxygen 1.8.14 for MRPT 2.0.5 Git: eda5ade6c Tue Aug 4 12:48:50 2020 +0200 at mar ago 4 13:00:11 CEST 2020