"Fossies" - the Fresh Open Source Software Archive 
Member "muscle/iogateway/MessageIOGateway.h" (21 Nov 2020, 22734 Bytes) of package /linux/privat/muscle7.62.zip:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "MessageIOGateway.h" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
7.61_vs_7.62.
1 /* This file is Copyright 2000-2013 Meyer Sound Laboratories Inc. See the included LICENSE.txt file for details. */
2
3 #ifndef MuscleMessageIOGateway_h
4 #define MuscleMessageIOGateway_h
5
6 #include "iogateway/AbstractMessageIOGateway.h"
7 #include "util/ByteBuffer.h"
8 #include "util/NestCount.h"
9
10 #ifdef MUSCLE_ENABLE_ZLIB_ENCODING
11 # include "zlib/ZLibCodec.h"
12 #endif
13
14 namespace muscle {
15
16 /**
17 * Encoding IDs identify how a Message object will be converted to and from a flattened byte-buffer. We currently support the vanilla MUSCLE_MESSAGE_ENCODING_DEFAULT plus 9 levels of zlib compression.
18 */
19 enum {
20 MUSCLE_MESSAGE_ENCODING_DEFAULT = 1164862256, /**< 'Enc0' -- just standard flattened-Message format, with no special encoding */
21 #ifdef MUSCLE_ENABLE_ZLIB_ENCODING
22 MUSCLE_MESSAGE_ENCODING_ZLIB_1, /**< lowest level of zlib compression (most CPU-efficient) */
23 MUSCLE_MESSAGE_ENCODING_ZLIB_2,
24 MUSCLE_MESSAGE_ENCODING_ZLIB_3,
25 MUSCLE_MESSAGE_ENCODING_ZLIB_4,
26 MUSCLE_MESSAGE_ENCODING_ZLIB_5,
27 MUSCLE_MESSAGE_ENCODING_ZLIB_6, /**< This is the recommended CPU-vs-space-savings-tradeoff for zlib */
28 MUSCLE_MESSAGE_ENCODING_ZLIB_7,
29 MUSCLE_MESSAGE_ENCODING_ZLIB_8,
30 MUSCLE_MESSAGE_ENCODING_ZLIB_9, /**< highest level of zlib compression (uses the least number of bytes) */
31 #endif
32 MUSCLE_MESSAGE_ENCODING_END_MARKER = MUSCLE_MESSAGE_ENCODING_DEFAULT+10 /**< guard value */
33 };
34
35 /** Callback function type for flatten/unflatten notification callbacks */
36 typedef status_t(*MessageFlattenedCallback)(const MessageRef & msgRef, void * userData);
37
38 /**
39 * A MessageIOGateway object knows how to send/receive Messages over a wire, via a provided DataIO object.
40 * May be subclassed to change the byte-level protocol, or used as-is if the default protocol is desired.
41 * If ZLib compression is desired, be sure to compile with -DMUSCLE_ENABLE_ZLIB_ENCODING
42 *
43 * The default protocol format used by this class is:
44 * -# 4 bytes (uint32) indicating the flattened size of the message
45 * -# 4 bytes (uint32) indicating the encoding type (should always be MUSCLE_MESSAGE_ENCODING_DEFAULT for now)
46 * -# n bytes of flattened Message (where n is the value specified in 1)
47 * -# goto 1 ...
48 *
49 * An example flattened Message byte structure is provided at the bottom of the
50 * MessageIOGateway.h header file.
51 */
52 class MessageIOGateway : public AbstractMessageIOGateway
53 {
54 public:
55 /**
56 * Constructor.
57 * @param outgoingEncoding The byte-stream format the message should be encoded into.
58 * Should be one of the MUSCLE_MESSAGE_ENCODING_* values.
59 * Default is MUSCLE_MESSAGE_ENCODING_DEFAULT, meaning that no
60 * compression will be done. Note that to use any of the
61 * MUSCLE_MESSAGE_ENCODING_ZLIB_* encodings, you MUST have
62 * defined the compiler symbol -DMUSCLE_ENABLE_ZLIB_ENCODING.
63 */
64 MessageIOGateway(int32 outgoingEncoding = MUSCLE_MESSAGE_ENCODING_DEFAULT);
65
66 /**
67 * Destructor.
68 * Deletes the held DataIO object.
69 */
70 virtual ~MessageIOGateway();
71
72 virtual bool HasBytesToOutput() const;
73 virtual void Reset();
74
75 /**
76 * Lets you specify a function that will be called every time an outgoing
77 * Message is about to be flattened by this gateway. You may alter the
78 * Message at this time, if you need to.
79 * @param cb Callback function to call.
80 * @param ud User data; set this to any value you like.
81 */
82 void SetAboutToFlattenMessageCallback(MessageFlattenedCallback cb, void * ud) {_aboutToFlattenCallback = cb; _aboutToFlattenCallbackData = ud;}
83
84 /**
85 * Lets you specify a function that will be called every time an outgoing
86 * Message has been flattened by this gateway.
87 * @param cb Callback function to call.
88 * @param ud User data; set this to any value you like.
89 */
90 void SetMessageFlattenedCallback(MessageFlattenedCallback cb, void * ud) {_flattenedCallback = cb; _flattenedCallbackData = ud;}
91
92 /**
93 * Lets you specify a function that will be called every time an incoming
94 * Message has been unflattened by this gateway.
95 * @param cb Callback function to call.
96 * @param ud User data; set this to any value you like.
97 */
98 void SetMessageUnflattenedCallback(MessageFlattenedCallback cb, void * ud) {_unflattenedCallback = cb; _unflattenedCallbackData = ud;}
99
100 /**
101 * Lets you specify the maximum allowable size for an incoming flattened Message.
102 * Doing so lets you limit the amount of memory a remote computer can cause your
103 * computer to attempt to allocate. Default max message size is MUSCLE_NO_LIMIT
104 * (or about 4 gigabytes)
105 * @param maxBytes New incoming message size limit, in bytes.
106 */
107 void SetMaxIncomingMessageSize(uint32 maxBytes) {_maxIncomingMessageSize = maxBytes;}
108
109 /** Returns the current maximum incoming message size, as was set above. */
110 uint32 GetMaxIncomingMessageSize() const {return _maxIncomingMessageSize;}
111
112 /** Returns our encoding method, as specified in the constructor or via SetOutgoingEncoding(). */
113 int32 GetOutgoingEncoding() const {return _outgoingEncoding;}
114
115 /** Call this to change the encoding this gateway applies to outgoing Messages.
116 * Note that the encoding change will take place starting with the next Message
117 * that is actually sent, so if any Messages are currently Queued up to be sent,
118 * they will be sent using the new encoding.
119 * Note that to use any of the MUSCLE_MESSAGE_ENCODING_ZLIB_* encodings,
120 * you MUST have defined the compiler symbol -DMUSCLE_ENABLE_ZLIB_ENCODING.
121 * @param ec Encoding type to use. Should be one of the MUSCLE_MESSAGE_ENCODING_* constants.
122 */
123 void SetOutgoingEncoding(int32 ec) {_outgoingEncoding = ec;}
124
125 /** Overwritten to augment AbstractMessageIOGateway::ExecuteSynchronousMessaging()
126 * with some additional logic that prepends a PR_COMMAND_PING to the outgoing Message queue
127 * and then makes sure that ExecuteSynchronousMessaging() doesn't return until the
128 * corresponding PR_COMMAND_PONG is received. That way we are guaranteed that
129 * the server's results are returned before this method returns.
130 * @param optReceiver optional object to call MessageReceivedFromGateway() on when a reply Message is received.
131 * @param timeoutPeriod Optional timeout period in microseconds, or MUSCLE_TIME_NEVER if no timeout is requested.
132 * @returns B_NO_ERROR on success, or an error code on failure (timeout or network error)
133 */
134 virtual status_t ExecuteSynchronousMessaging(AbstractGatewayMessageReceiver * optReceiver, uint64 timeoutPeriod = MUSCLE_TIME_NEVER);
135
136 /** Convenience method: Connects to the specified IPAddressAndPort via TCP, sends the specified Message, waits
137 * for a reply Message, and returns the reply Message. This is useful if you want a client/server transaction
138 * to act like a function call, although it is a bit inefficient since the TCP connection is re-established
139 * and then closed every time this function is called.
140 * @param requestMessage the request Message to send
141 * @param targetIAP Where to connect to (via TCP) to send (requestMessage)
142 * @param timeoutPeriod The maximum amount of time this function should wait for a reply before returning.
143 * Defaults to MUSCLE_TIME_NEVER, i.e. no timeout.
144 * @returns A reference to a reply Message, or a NULL MessageRef() if we were unable to connect to the specified
145 * address, or an empty Message if we connected and send our request okay, but received no reply Message.
146 */
147 MessageRef ExecuteSynchronousMessageRPCCall(const Message & requestMessage, const IPAddressAndPort & targetIAP, uint64 timeoutPeriod = MUSCLE_TIME_NEVER);
148
149 /** This method is similar to ExecuteSynchronousMessageRPCCall(), except that it doesn't wait for a reply Message.
150 * Instead, it sends the specified (requestMessage), and returns B_NO_ERROR if the Message successfully goes out
151 * over the TCP socket, or an error code otherwise.
152 * @param requestMessage the request Message to send
153 * @param targetIAP Where to connect to (via TCP) to send (requestMessage)
154 * @param timeoutPeriod The maximum amount of time this function should wait for TCP to connect, before returning.
155 * @returns B_NO_ERROR if the Message was sent, or an error code if we couldn't connect (or if we connected but couldn't send the data).
156 * Note that there is no way to know what (if anything) the receiving client did with the Message.
157 */
158 status_t ExecuteSynchronousMessageSend(const Message & requestMessage, const IPAddressAndPort & targetIAP, uint64 timeoutPeriod = MUSCLE_TIME_NEVER);
159
160 /** Calls through to our protected FlattenHeaderAndMessage() method. Provided for special-case classes that want to
161 * to access that functionality directly rather than going through the gateway's usual DoOutput() interface.
162 * @param msgRef Reference to the Message object to flatten.
163 * @returns a Reference to a ByteBuffer containing the flattened bytes of both the MessageIOGateway header
164 * and the passed-in Message object.
165 * @note see UnflattenHeaderAndMessage() for details.
166 */
167 ByteBufferRef CallFlattenHeaderAndMessage(const MessageRef & msgRef) const {return FlattenHeaderAndMessage(msgRef);}
168
169 /** Calls through to our protected UnflattenHeaderAndMessage() method. Provided for special-case classes that want to
170 * to access that functionality directly rather than going through the gateway's usual DoInput() interface.
171 * @param bufRef Reference to a ByteBuffer object that contains the appropriate header
172 * bytes, followed by some flattened Message bytes.
173 * @returns a Reference to a Message object containing the Message that was encoded in
174 * the ByteBuffer on success, or a NULL reference on failure.
175 * @note see UnflattenHeaderAndMessage() for details.
176 */
177 MessageRef CallUnflattenHeaderAndMessage(const ConstByteBufferRef & bufRef) const {return UnflattenHeaderAndMessage(bufRef);}
178
179 protected:
180 virtual int32 DoOutputImplementation(uint32 maxBytes = MUSCLE_NO_LIMIT);
181 virtual int32 DoInputImplementation(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes = MUSCLE_NO_LIMIT);
182
183 /**
184 * Should flatten the specified Message object into a newly allocated ByteBuffer
185 * object and return the ByteBufferRef. The returned ByteBufferRef's contents
186 * should consist of (GetHeaderSize()) bytes of header, followed by the flattened
187 * Message data.
188 * @param msgRef Reference to a Message to flatten into a byte array.
189 * @return A reference to a ByteBuffer object (containing the appropriate header
190 * bytes, followed flattened Message data) on success, or a NULL reference on failure.
191 * The default implementation uses msg.Flatten() and then (optionally) ZLib compression to produce
192 * the flattened bytes.
193 */
194 virtual ByteBufferRef FlattenHeaderAndMessage(const MessageRef & msgRef) const;
195
196 /**
197 * Unflattens a specified ByteBuffer object back into a MessageRef object.
198 * @param bufRef Reference to a ByteBuffer object that contains the appropriate header
199 * bytes, followed by some flattened Message bytes.
200 * @returns a Reference to a Message object containing the Message that was encoded in
201 * the ByteBuffer on success, or a NULL reference on failure.
202 * The default implementation uses (optional) ZLib decompression (depending on the header bytes)
203 * and then msg.Unflatten() to produce the Message.
204 */
205 virtual MessageRef UnflattenHeaderAndMessage(const ConstByteBufferRef & bufRef) const;
206
207 /**
208 * Returns the size of the pre-flattened-message header section, in bytes.
209 * The default Message protocol uses an 8-byte header (4 bytes for encoding ID, 4 bytes for message size),
210 * so the default implementation of this method always returns 8.
211 */
212 virtual uint32 GetHeaderSize() const;
213
214 /**
215 * Must Extract and returns the buffer body size from the given header.
216 * Note that the returned size should NOT count the header bytes themselves!
217 * @param header Points to the header of the message. The header is GetHeaderSize() bytes long.
218 * @return The number of bytes in the body of the message associated with (header), on success,
219 * or a negative value to indicate an error (invalid header, etc).
220 */
221 virtual int32 GetBodySize(const uint8 * header) const;
222
223 /** Overridden to return true until our PONG Message is received back */
224 virtual bool IsStillAwaitingSynchronousMessagingReply() const {return _noRPCReply.IsInBatch() ? HasBytesToOutput() : (_pendingSyncPingCounter >= 0);}
225
226 /** Overridden to filter out our PONG Message and pass everything else on to (r).
227 * @param msg the Message that was received
228 * @param userData the user-data pointer, as was passed to ExecuteSynchronousMessaging()
229 * @param r the receiver object that we will call MessageReceivedFromGateway() on
230 */
231 virtual void SynchronousMessageReceivedFromGateway(const MessageRef & msg, void * userData, AbstractGatewayMessageReceiver & r);
232
233 /** Allocates and returns a Message to send as a Ping Message for its synchronization.
234 * Default implementation calls GetMessageFromPool(PR_COMMAND_PING) and adds the tag value as an int32 field.
235 * @param syncPingCounter the value to add as a tag.
236 */
237 virtual MessageRef CreateSynchronousPingMessage(uint32 syncPingCounter) const;
238
239 /**
240 * Returns true iff (msg) is a pong-Message corresponding to a ping-Message
241 * that was created by CreateSynchronousPingMessage(syncPingCounter).
242 * @param msg a Message received from the remote peer
243 * @param syncPingCounter The value of the ping-counter that we are interested in checking against.
244 */
245 virtual bool IsSynchronousPongMessage(const MessageRef & msg, uint32 syncPingCounter) const;
246
247 /**
248 * Removes the next MessageRef from our outgoing Message queue and returns it in (retMsg).
249 * @param retMsg on success, the next MessageRef to send will be written into this MessageRef.
250 * @returns B_NO_ERROR on success, or B_DATA_NOT_FOUND on failure (queue was empty)
251 */
252 virtual status_t PopNextOutgoingMessage(MessageRef & retMsg);
253
254 /**
255 * Should return true iff we need to make sure that any outgoing Messages that we've deflated
256 * are inflatable independently of each other. The default method always returns false, since it
257 * allowed better compression ratios if we can assume the receiver will be re-inflating the
258 * Messages is FIFO order. However, in some cases it's not possible to make that assumption.
259 * In those cases, reimplementing this method to return true will cause each outgoing Message
260 * to be deflated independently of its predecessors, giving more flexibility at the expense of
261 * less compression.
262 */
263 virtual bool AreOutgoingMessagesIndependent() const {return false;}
264
265 private:
266 ByteBufferRef FlattenHeaderAndMessageAux(const MessageRef & msgRef) const;
267
268 #ifdef MUSCLE_ENABLE_ZLIB_ENCODING
269 ZLibCodec * GetCodec(int32 newEncoding, ZLibCodec * & setCodec) const;
270 #endif
271
272 #ifndef DOXYGEN_SHOULD_IGNORE_THIS // this is here so doxygen-coverage won't complain that I haven't documented this class -- but it's a private class so I don't need to
273 class TransferBuffer
274 {
275 public:
276 TransferBuffer() : _offset(0) {/* empty */}
277
278 void Reset()
279 {
280 _buffer.Reset();
281 _offset = 0;
282 }
283
284 ByteBufferRef _buffer;
285 uint32 _offset;
286 };
287 #endif
288
289 status_t SendMoreData(int32 & sentBytes, uint32 & maxBytes);
290 status_t ReceiveMoreData(int32 & readBytes, uint32 & maxBytes, uint32 maxArraySize);
291
292 ByteBufferRef GetScratchReceiveBuffer();
293 void ForgetScratchReceiveBufferIfSubclassIsStillUsingIt();
294
295 TransferBuffer _sendBuffer;
296 TransferBuffer _recvBuffer;
297
298 IPAddressAndPort _nextPacketDest;
299 ByteBufferRef _scratchRecvBuffer; // used to efficiently receive small Messages in the normal case
300
301 uint32 _maxIncomingMessageSize;
302 int32 _outgoingEncoding;
303
304 MessageFlattenedCallback _aboutToFlattenCallback;
305 void * _aboutToFlattenCallbackData;
306
307 MessageFlattenedCallback _flattenedCallback;
308 void * _flattenedCallbackData;
309
310 MessageFlattenedCallback _unflattenedCallback;
311 void * _unflattenedCallbackData;
312
313 Message _scratchPacketMessage;
314
315 #ifdef MUSCLE_ENABLE_ZLIB_ENCODING
316 mutable ZLibCodec * _sendCodec;
317 mutable ZLibCodec * _recvCodec;
318 #endif
319
320 NestCount _noRPCReply;
321 int32 _syncPingCounter;
322 int32 _pendingSyncPingCounter;
323
324 DECLARE_COUNTED_OBJECT(MessageIOGateway);
325 };
326 DECLARE_REFTYPES(MessageIOGateway);
327
328 /** This class is similar to MessageIOGateway, but it also keep a running tally
329 * of the total number of bytes of data currently in its outgoing-Messages queue.
330 * Message sizes are calculated via FlattenedSize(); zlib compression is not
331 * taken into account.
332 */
333 class CountedMessageIOGateway : public MessageIOGateway
334 {
335 public:
336 /**
337 * Constructor.
338 * @param outgoingEncoding See MessageIOGateway constructor for details.
339 */
340 CountedMessageIOGateway(int32 outgoingEncoding = MUSCLE_MESSAGE_ENCODING_DEFAULT);
341
342 virtual status_t AddOutgoingMessage(const MessageRef & messageRef);
343
344 virtual void Reset();
345
346 /** Returns the number of bytes of data currently in our outgoing-messages-queue
347 * Calculated by calling FlattenedSize() on the Messages as they are added to
348 * or removed from the queue)
349 */
350 uint32 GetNumOutgoingDataBytes() const {return _outgoingByteCount;}
351
352 protected:
353 virtual status_t PopNextOutgoingMessage(MessageRef & ret);
354
355 private:
356 uint32 _outgoingByteCount;
357 };
358 DECLARE_REFTYPES(CountedMessageIOGateway);
359
360 /** This function can be used to reduce memory usage when sending the same large
361 * Message to multiple MessageIOGateways.
362 *
363 * If you call this function on your large Message object just before you pass it off
364 * to one or more session objects for output, the Message object will be tagged with
365 * a rendezvous-point object such that only one of the gateways will have to allocate
366 * a serialized buffer and Flatten() the Message into it; the latter gateways will
367 * reuse the buffer created by the first gateway. This can be more memory-efficient
368 * than the default behavior (where each MessageIOGateway has to allocate its
369 * own separate ByteBuffer and separately flatten the Message into it).
370 *
371 * @param msg a reference the Message you are about to pass to multiple gateways.
372 */
373 status_t OptimizeMessageForTransmissionToMultipleGateways(const MessageRef & msg);
374
375 /** Returns true iff the given Message has had
376 * OptimizeMessageForTransmissionToMultipleGateways() called on it already.
377 * @param msg the Message to check for the presence of an optimization-tag.
378 */
379 bool IsMessageOptimizedForTransmissionToMultipleGateways(const MessageRef & msg);
380
381 //////////////////////////////////////////////////////////////////////////////////
382 //
383 // Here is a commented example of a flattened Message's byte structure, using
384 // the MUSCLE_MESSAGE_ENCODING_DEFAULT encoding.
385 //
386 // When one uses a MessageIOGateway with the default encoding to send Messages,
387 // it will send out series of bytes that looks like this.
388 //
389 // Note that this information is only helpful if you are trying to implement
390 // your own MessageIOGateway-compatible serialization/deserialization code.
391 // C, C#, C++, Java, and Python programmers will have a much easier time if
392 // they use the MessageIOGateway functionality provided in the MUSCLE archive,
393 // rather than coding at the byte-stream level.
394 //
395 // The Message used in this example has a 'what' code value of 2 and the
396 // following name/value pairs placed in it:
397 //
398 // String field, name="!SnKy" value="/*/*/beshare"
399 // String field, name="session" value="123"
400 // String field, name="text" value="Hi!"
401 //
402 // Bytes in single quotes represent ASCII characters, bytes without quotes
403 // means literal decimal byte values. (E.g. '2' means 50 decimal, 2 means 2 decimal)
404 //
405 // All occurrences of '0' here indicate the ASCII digit zero (decimal 48), not the letter O.
406 //
407 // The bytes shown here should be sent across the TCP socket in
408 // 'normal reading order': left to right, top to bottom.
409 //
410 // 88 0 0 0 (int32, indicates that total message body size is 88 bytes) (***)
411 // '0' 'c' 'n' 'E' ('Enc0' == MUSCLE_MESSAGE_ENCODING_DEFAULT) (***)
412 //
413 // '0' '0' 'M' 'P' ('PM00' == CURRENT_PROTOCOL_VERSION)
414 // 2 0 0 0 (2 == NET_CLIENT_NEW_CHAT_TEXT, the message's 'what' code)
415 // 3 0 0 0 (3 == Number of name/value pairs in this message)
416 // 6 0 0 0 (6 == Length of first name, "!SnKy", include NUL byte)
417 // '!' 'S' 'n' 'K' (Field name ASCII bytes.... "!SnKy")
418 // 'y' 0 (last field name ASCII byte and the NUL terminator byte)
419 // 'R' 'T' 'S' 'C' ('CSTR' == B_STRING_TYPE; i.e. this value is a string)
420 // 13 0 0 0 (13 == Length of value string including NUL byte)
421 // '/' '*' '/' '*' (Field value ASCII bytes.... "/*/*/beshare")
422 // '/' 'b' 'e' 's' (....)
423 // 'h' 'a' 'r' 'e' (....)
424 // 0 (NUL terminator byte for the ASCII value)
425 // 8 0 0 0 (8 == Length of second name, "session", including NUL)
426 // 's' 'e' 's' 's' (Field name ASCII Bytes.... "session")
427 // 'i' 'o' 'n' 0 (rest of field name ASCII bytes and NUL terminator)
428 // 'R' 'T' 'S' 'C' ('CSTR' == B_STRING_TYPE; i.e. this value is a string)
429 // 4 0 0 0 (4 == Length of value string including NUL byte)
430 // '1' '2' '3' 0 (Field value ASCII bytes... "123" plus NUL byte)
431 // 5 0 0 0 (5 == Length of third name, "text", including NUL)
432 // 't' 'e' 'x' 't' (Field name ASCII bytes... "text")
433 // 0 (NUL byte terminator for field name)
434 // 'R' 'T' 'S' 'C' ('CSTR' == B_STRING_TYPE; i.e. this value is a string)
435 // 4 0 0 0 (3 == Length of value string including NUL byte)
436 // 'H' 'i' '!' 0 (Field value ASCII Bytes.... "Hi!" plus NUL byte)
437 //
438 // [that's the complete byte sequence; to transmit another message,
439 // you would start again at the top, with the next message's
440 // message-body-length-count]
441 //
442 // (***) The bytes in this field should not be included when tallying the message body size!
443 //
444 //////////////////////////////////////////////////////////////////////////////////
445
446 } // end namespace muscle
447
448 #endif