9 #ifndef SERIALIZATION_ZMQ_H 10 #define SERIALIZATION_ZMQ_H 40 template <
typename ZMQ_SOCKET_TYPE>
43 const size_t max_packet_len = 0)
46 if (!buf)
throw std::bad_alloc();
51 throw std::runtime_error(
52 "[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably " 53 "means something went wrong...");
57 : static_cast<unsigned int>(ceil(
double(nBytes) / max_packet_len));
58 for (
unsigned int iPkt = 0; iPkt < nPkts; ++iPkt)
63 if (!fd)
throw std::bad_alloc();
69 max_packet_len * iPkt;
70 size_t nBytesThisPkt = nBytes - max_packet_len * iPkt;
71 if (max_packet_len != 0 && nBytesThisPkt > max_packet_len)
72 nBytesThisPkt = max_packet_len;
75 if (0 != zmq_msg_init_data(
76 &message, pkt_data, nBytesThisPkt,
78 throw std::runtime_error(
79 "[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
82 zmq_msg_send(&message, zmq_socket, fd->
do_free ? 0 : ZMQ_SNDMORE);
83 if (0 != zmq_msg_close(&message))
84 throw std::runtime_error(
85 "[mrpt_send_to_zmq] Error in zmq_msg_close()");
86 if (sent_size != static_cast<int>(nBytesThisPkt))
87 throw std::runtime_error(
88 "[mrpt_send_to_zmq] Error in zmq_msg_send()");
96 template <
typename ZMQ_SOCKET_TYPE,
typename VECTOR_MSG_T>
98 ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T& out_lst_msgs,
100 size_t* rx_obj_length_in_bytes)
102 if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
103 out_lst_msgs.clear();
106 size_t more_size =
sizeof(more);
110 zmq_msg_t* msg =
new zmq_msg_t();
111 if (0 != zmq_msg_init(msg))
return false;
112 out_lst_msgs.push_back(msg);
114 int rc = zmq_msg_recv(msg, zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
115 if (rc == -1)
return false;
117 rc = zmq_getsockopt(zmq_socket, ZMQ_RCVMORE, &more, &more_size);
118 if (rc != 0)
return false;
120 if (out_lst_msgs.size() == 1 && !more)
123 if (rx_obj_length_in_bytes)
124 *rx_obj_length_in_bytes = zmq_msg_size(msg);
128 if (out_lst_msgs.size() > 1)
130 for (
size_t i = 0; i < out_lst_msgs.size(); i++)
133 zmq_msg_data(out_lst_msgs[i]), zmq_msg_size(out_lst_msgs[i]));
135 if (rx_obj_length_in_bytes)
144 template <
typename VECTOR_MSG_T>
147 for (
size_t i = 0; i < lst_msgs.size(); ++i)
149 zmq_msg_close(lst_msgs[i]);
174 template <
typename ZMQ_SOCKET_TYPE>
176 ZMQ_SOCKET_TYPE zmq_socket,
bool dont_wait =
false,
177 size_t* rx_obj_length_in_bytes =
nullptr)
181 std::vector<zmq_msg_t*> lst_msgs_to_close;
183 zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
184 rx_obj_length_in_bytes))
202 template <
typename ZMQ_SOCKET_TYPE>
205 bool dont_wait =
false,
size_t* rx_obj_length_in_bytes =
nullptr)
208 std::vector<zmq_msg_t*> lst_msgs_to_close;
210 zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
211 rx_obj_length_in_bytes))
mrpt::utils::CSerializable::Ptr mrpt_recv_from_zmq(ZMQ_SOCKET_TYPE zmq_socket, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Receives an MRPT object from a ZMQ socket, determining the type of the object on-the-fly.
The virtual base class which provides a unified interface for all persistent objects in MRPT...
uint64_t Seek(uint64_t Offset, CStream::TSeekOrigin Origin=sFromBeginning) override
Introduces a pure virtual method for moving to a specified position in the streamed resource...
void WriteBuffer(const void *Buffer, size_t Count)
Writes a block of bytes to the stream from Buffer.
GLsizei GLsizei GLuint * obj
CSerializable::Ptr ReadObject()
Reads an object from stream, its class determined at runtime, and returns a smart pointer to the obje...
void Clear()
Clears the memory buffer.
This CStream derived class allow using a memory buffer as a CStream.
void WriteObject(const CSerializable *o)
Writes an object to the stream.
void free_fn_for_zmq(void *data, void *hint)
Used in mrpt_send_to_zmq().
std::shared_ptr< CSerializable > Ptr
void * getRawBufferData()
Method for getting a pointer to the raw stored data.
This is the global namespace for all Mobile Robot Programming Toolkit (MRPT) libraries.
void assignMemoryNotOwn(const void *data, const uint64_t nBytesInData)
Initilize the data in the stream from a block of memory which is NEITHER OWNED NOR COPIED by the obje...
void mrpt_send_to_zmq(ZMQ_SOCKET_TYPE zmq_socket, const mrpt::utils::CSerializable &obj, const size_t max_packet_len=0)
Send an MRPT object to a ZMQ socket.
void free_zmq_msg_lst(VECTOR_MSG_T &lst_msgs)
bool mrpt_recv_from_zmq_into(ZMQ_SOCKET_TYPE zmq_socket, mrpt::utils::CSerializable &target_object, bool dont_wait=false, size_t *rx_obj_length_in_bytes=nullptr)
Like mrpt_recv_from_zmq() but without dynamically allocating the received object, more efficient to u...
uint64_t getTotalBytesCount() override
Returns the total size of the internal buffer.
bool mrpt_recv_from_zmq_buf(ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T &out_lst_msgs, mrpt::utils::CMemoryStream &target_buf, bool dont_wait, size_t *rx_obj_length_in_bytes)
Users may normally call mrpt_recv_from_zmq() and mrpt_recv_from_zmq_into().