9 #ifndef SERIALIZATION_ZMQ_H
10 #define SERIALIZATION_ZMQ_H
18 namespace serialization
41 template <
typename ZMQ_SOCKET_TYPE>
44 const size_t max_packet_len = 0)
47 if (!buf)
throw std::bad_alloc();
49 buf->WriteObject(&
obj);
52 throw std::runtime_error(
53 "[mrpt_send_to_zmq] Serialized object has 0 bytes, which probably "
54 "means something went wrong...");
58 :
static_cast<unsigned int>(ceil(
double(nBytes) / max_packet_len));
59 for (
unsigned int iPkt = 0; iPkt < nPkts; ++iPkt)
62 mrpt::utils::internal::TFreeFnDataForZMQ* fd =
63 new mrpt::utils::internal::TFreeFnDataForZMQ();
64 if (!fd)
throw std::bad_alloc();
69 void* pkt_data =
reinterpret_cast<char*
>(fd->buf->getRawBufferData()) +
70 max_packet_len * iPkt;
71 size_t nBytesThisPkt = nBytes - max_packet_len * iPkt;
72 if (max_packet_len != 0 && nBytesThisPkt > max_packet_len)
73 nBytesThisPkt = max_packet_len;
76 if (0 != zmq_msg_init_data(
77 &message, pkt_data, nBytesThisPkt,
79 throw std::runtime_error(
80 "[mrpt_send_to_zmq] Error in zmq_msg_init_data()");
83 zmq_msg_send(&message, zmq_socket, fd->do_free ? 0 : ZMQ_SNDMORE);
84 if (0 != zmq_msg_close(&message))
85 throw std::runtime_error(
86 "[mrpt_send_to_zmq] Error in zmq_msg_close()");
87 if (sent_size !=
static_cast<int>(nBytesThisPkt))
88 throw std::runtime_error(
89 "[mrpt_send_to_zmq] Error in zmq_msg_send()");
97 template <
typename ZMQ_SOCKET_TYPE,
typename VECTOR_MSG_T>
99 ZMQ_SOCKET_TYPE zmq_socket, VECTOR_MSG_T& out_lst_msgs,
101 size_t* rx_obj_length_in_bytes)
103 if (rx_obj_length_in_bytes) *rx_obj_length_in_bytes = 0;
104 out_lst_msgs.clear();
107 size_t more_size =
sizeof(more);
111 zmq_msg_t* msg =
new zmq_msg_t();
112 if (0 != zmq_msg_init(msg))
return false;
113 out_lst_msgs.push_back(msg);
115 int rc = zmq_msg_recv(msg, zmq_socket, dont_wait ? ZMQ_DONTWAIT : 0);
116 if (rc == -1)
return false;
118 rc = zmq_getsockopt(zmq_socket, ZMQ_RCVMORE, &more, &more_size);
119 if (rc != 0)
return false;
121 if (out_lst_msgs.size() == 1 && !more)
124 if (rx_obj_length_in_bytes)
125 *rx_obj_length_in_bytes = zmq_msg_size(msg);
129 if (out_lst_msgs.size() > 1)
131 for (
size_t i = 0; i < out_lst_msgs.size(); i++)
133 target_buf.WriteBuffer(
134 zmq_msg_data(out_lst_msgs[i]), zmq_msg_size(out_lst_msgs[i]));
136 if (rx_obj_length_in_bytes)
145 template <
typename VECTOR_MSG_T>
148 for (
size_t i = 0; i < lst_msgs.size(); ++i)
150 zmq_msg_close(lst_msgs[i]);
176 template <
typename ZMQ_SOCKET_TYPE>
178 ZMQ_SOCKET_TYPE zmq_socket,
bool dont_wait =
false,
179 size_t* rx_obj_length_in_bytes =
nullptr)
183 std::vector<zmq_msg_t*> lst_msgs_to_close;
185 zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
186 rx_obj_length_in_bytes))
189 obj = target_buf.ReadObject();
204 template <
typename ZMQ_SOCKET_TYPE>
206 ZMQ_SOCKET_TYPE zmq_socket,
208 size_t* rx_obj_length_in_bytes =
nullptr)
211 std::vector<zmq_msg_t*> lst_msgs_to_close;
213 zmq_socket, lst_msgs_to_close, target_buf, dont_wait,
214 rx_obj_length_in_bytes))
217 target_buf.ReadObject(&target_object);