However, there are some things that muscled does not do, such as read or write from the hard disk, authenticate users, or implement application-specific logic. But if your client-server application needs to do such things, don't worry, the muscled architecture can be seamlessly extended to include the functionality you require. This document will show you how to do that.
This document assumes that you have some basic knowledge of C++, and how MUSCLE works (at least from a client program's perspective). If you don't, you may want to review the Beginner's Guide to MUSCLE programming. You will want to have the MUSCLE header files (muscle/*/*.h) handy as you read along, as this document does not attempt to be comprehensive in its API descriptions.
Note that the server side of MUSCLE is 100% single-threaded by default. It uses multiplexed non-blocking I/O and message queues to achieve a multithreaded effect, but is single-threaded for purposes of stability and portability. Trying to integrate multithreaded code into a MUSCLE server is possible, but usually not necessary. I suggest you stick with the single-thread model if you can, it's not as bad as you think ;^).
When the server receives a new TCP connection from a client, it uses the factory object passed in to its constructor to create a new AbstractReflectSession object. After the session object is created and initialized, this method is called on it. Here is where you can do any setup work that needs to be done for your new client, and return B_NO_ERROR if everything is okay. If something has gone horribly wrong, you can return B_ERROR, and the connection to the client will be closed, and this session object deleted. Be sure to call your superclass's AttachedToServer() method as the first thing in your implementation, otherwise your session won't get set up properly!
Called by the ReflectServer when it is about to remove this session from its list of active sessions. Any last-minute cleanup work you need to do can be done here. Be sure to call your superclasses' AboutToDetachFromServer() as the last thing in your implementation, so it can clean up its stuff as well.
This method is called whenever your session's client computer has sent a Message to the MUSCLE server. You would generally implement this method to parse the Message and respond appropriately to your client's request. Be sure to call your superclass's implementation of this method for any messages that you can't parse yourself.
This method is called whenever another AbstractReflectSession has sent you a Message. It is via this method that clients can interact with each other, e.g. for client A to send a message through the MUSCLE server to client B, it would go something like this:
When implementing your own session logic, there are several different "pre-made" AbstractReflectSession subclasses to choose from. Depending on how much code you want to re-use, you can override AbstractReflectSession directly and handle everything yourself, or you can override one of the following, and take advantage of the logic already provided:
Called when the server starts up. You can do any initialization work here, and return B_NO_ERROR if you are ready to go. (B_ERROR will abort startup).
Must be called just before the ReflectServer object is deleted. Default implementation detaches all sessions from the server. You can override this method to do any additional tidying up if you like.
If you need actions to be performed at a specific time (as opposed to only doing things in response to incoming Messages), you can override these two methods. Note that this sort of behaviour is discouraged if you can avoid it, as it is easy to abuse. (Remember that polling is inefficient and evil!) GetPulseTime() can be overridden to specify when Pulse() should be called next. By default, Pulse() is never called (i.e. the default implementation of GetPulseTime() returns MUSCLE_NEVER_PULSE).
// Factory class for my custom session type class MySessionFactory : public ReflectSessionFactory { public: // Note that (clientAddress) and (serverAddress) are ignored, we don't care about them for now */ virtual AbstractReflectSessionRef CreateSession(const String & clientAddress, const IPAddressAndPort & serverAddress) { AbstractReflectSessionRef ret(newnothrow MyReflectSessionSubclass()); if (ret == NULL) WARN_OUT_OF_MEMORY; return ret; } }; int main(int argc, char ** argv) { CompleteSetupSystem css; ReflectServer myServer; // instantiate regular ReflectServer or your own subclass myServer.PutAcceptFactory(2960, ReflectSessionFactoryRef(new MySessionFactory)); // Whenever we get a connection on port 2960, use my factory to create a new session object status_t ret = myServer.ServerProcessLoop(); // everything happens here! Will not return for a long time (if ever!) if (ret == B_NO_ERROR) printf("Server terminated normally.\n"); else printf("Server exited with an error condition!\n"); myServer.Cleanup(); return 0; }
The ServerProcessLoop() method is where all the serving get done--it won't return until the server decides to quit--either due to a critical error, or because your code told it to shut down (by calling EndServer()).
Note that if you are creating a new, custom server application, you almost certainly WON'T need to make your own AbstractMessageIOGateway subclass. MUSCLE comes with a very nice MessageIOGateway class, which knows how to translate any Message into the standard MUSCLE byte-stream format. The MessageIOGateway class is used by default, and in most cases, it is all you need. An additional benefit to sticking with the standard MessageIOGateway class is that you will remain stream-compatible with other MUSCLE code that is out there--defining your own AbstractMessageIOGateway class means breaking protocol compatibility, which may cause headaches later on. Nonetheless, there are situations where a custom IO gateway is useful:
This method must return true if and only if you have bytes ready to send out over the TCP connection. It is used to determine whether or not the MUSCLE server should wake up and call DoOutputImplementation() when there is room to send more bytes.
This method is called when you have bytes to send (see above) and there is space in the TCP buffers to send it. You should attempt to write up to (maxBytes) bytes of data to your DataIO object (i.e. get the next message from the outgoing message queue, convert it to bytes, send the bytes, repeat), and return the number of bytes you were actually able to send. If you encounter an error, return -1 to indicate that. Returning -1 is taken to mean that the stream has been broken, and will usually result in the termination of the session. This method should never block.
This method is called whenever incoming bytes are available on your DataIO object's TCP stream. You should read up to (maxBytes) of data, convert as many bytes as possible into Message objects, and call target.CallMessageReceivedFromGateway(msg, NULL) for any Message that you create. Any bytes that represent a partial message (i.e. you don't have the entire Message's worth yet) should be held in a local buffer somehow, for next time. This method should also return the number of bytes that were actually read, or -1 if there was an error reading.
AbstractMessageIOGatewayRef MyReflectSession :: CreateGateway() { AbstractMessageIOGatewayRef ret(newnothrow MyGateway); // My custom I/O gateway! if (ret() == NULL) WARN_OUT_OF_MEMORY; return ret; }
This method returns a reference to a Message that is shared by all sessions. Any session may access or alter this message, and other sessions can call GetCentralState() to see the changes. For example, StorageReflectSession uses this message to store the root of the node database. Please be gentle while altering the central state message, however, as other sessions may be relying on their data being there.
Returns a read-only reference to the current set of sessions attached to the server. Handy if you need "down and dirty" direct access to the other sessions. (Note that this type of direct access usually leads to dynamic casting, and the same functionality can often be implemented more cleanly by message passing... BroadcastToAllSessions() is helpful here)
Given a session ID (expressed either as a string or an integer), returns you a reference to that session... or a NULL reference if no such session exists.
Adds the given new session to the session list. (socket) should be the socket used by the new session (the server will watch this socket to determine when incoming data is available), or a NULL ConstSocketRef if the new session has no TCP socket of its own (i.e. a 'fake' session)
Also adds the given new session to the session list. But instead of immediately being active using a given socket, this method takes the IP address of a remote host that you would like your muscle server to connect out to. The connection will be done asynchronously, and (session)'s AsyncConnectCompleted() method will be called when the connection completes successfully. (If the connection fails, (session) will be detached from the server and discarded)
Sort of a session's hara-kiri method. Call this on a session and that session will be marked for removal and deletion by the server at the next opportunity. The session isn't deleted immediately (as it may be in the middle of an operation), but will be ASAP.
Causes the session to be terminated, and (newSession) to be put in its place. (newSession) will be given the same TCP socket and IOGateway as the old session had. This method is a quick way to swap out the server-side logic of a session while maintaining its connection to the client.
void MyReflectSession :: SetupMyStuff(const Message & msg1, const Message & msg2, const Message & msg3) { // Add our three messages as nodes in our database MessageRef msg = GetMessageFromPool(PR_COMMAND_SETDATA); if (msg()) { msg()->AddMessage("appnodes/node1", msg1); msg()->AddMessage("appnodes/node2", msg2); msg()->AddMessage("appnodes/node3", msg3); StorageReflectSession::MessageReceivedFromGateway(msg); } else WARN_OUT_OF_MEMORY; }
The above technique will work for any message type that is supported for clients (i.e. PR_COMMAND_*). The semantics will be the same as they would be for the client, if it were to send the same message.
If you need more control than the client messaging API can provide, however, you can call the public and protected members of the StorageReflectSession class directly. For example, you can create or set nodes in your subtree of the database by calling SetDataNode():
void MyReflectSession :: SetupMyStuff2(const MessageRef & msg1, const MessageRef & msg2, const MessageRef & msg3) { SetDataNode("appnodes/node1", msg1); SetDataNode("appnodes/node2", msg2); SetDataNode("appnodes/node3", msg3); }
This method is easier to code and gives you some additional control.
Sometimes you'll want to do your own queries of the server-side database. One way to do this would be to create a PR_COMMAND_GETDATA message, pass it to StorageReflectSession::MessageReceivedFromGateway(), and then parse the resulting PR_RESULTS_DATAITEMS message that is given to your MessageReceivedFromSession() method. But that method is somewhat inefficient, and a little bit error-prone (how are you to know which PR_REZULTS_DATAITEMS message corresponds with which PR_COMMAND_GETDATA message?). A better way to do it is by setting up your own query callback. In the following example, we will execute a query for all connected clients' session nodes ("*/*"), and our callback will be executed once for every node that matches the query string.
void MyReflectSession :: FindAllSessionNodes() { QueuemyList; // collect results here WildPathMatcher matcher; matcher.PutPathString("*/*", QueryFilterRef()); matcher.DoTraversal((PathMatchCallback)MyCallback, this, *_globalRoot, true, &myList); // (myList) now contains path names of all session nodes... } int MyReflectSession :: MyCallback(DataNode & node, void * userData) { printf("MyCallback called for node: [%s]\n", node.GetNodePath()); printf("Message for this node is: "); const Message * nodeData = node.GetData()->GetItemPointer(); if (nodeData) nodeData->PrintToStream(); else printf(" \n"); Queue * myList = (Queue *) userData; // as passed in to DoTraversal() myList->AddTail(node.GetNodePath()); return node.GetDepth(); // continue traversal as usual }
The above code also demonstrates the use of the (userData) field to carry additional information into the callback. Whatever value you pass in as the last argument of DoTraversal() is passed back to your callback method, to do with as you wish. Here it is used to pass in a pointer to a Queue to which the path names of the matching nodes are added.
The return value of your callback function is also important. It should specify the node-depth at which to continue the traversal. This value can be used to dynamically prune the search of the database tree, for efficiency. For a full traversal, you should always return node.GetDepth(). On the other hand, if you have found the data you wanted and wish to terminate the search immediately, you would return 0. Or, if you wanted the search to continue at the next session node, you could return 2 (which is the level of the session nodes in the tree). As a final example, if you want the search to continue, but not to recurse into the subtree below the current node, you would return node.GetDepth()-1.
A WildPathMatcher can do a query made up of several path strings at once. For example, if you wanted to do a query on all host nodes AND all session nodes, you could do this:
WildPathMatcher matcher; matcher.PutPathString("*", QueryFilterRef()); // query on all host nodes (level 1) matcher.PutPathString("*/*", QueryFilterRef()); // query on all session nodes (level 2) matcher.DoTraversal((PathMatchCallback)MyCustomCallback, this, *_globalRoot, true, NULL);
A single traversal will not trigger the callback function for any given node more than once, even if that node is matched by more than one path string.
The third argument to DoTraversal() (*_globalRoot in the examples above) is the node to start the search at. For searches of the entire database, *_globalRoot is the correct value to place here; however you may wish to limit your search to only a subtree of the database tree. For example, if you wish to make your search relative to the current session's node only (and thus search only nodes that your own session created), you could put *_sessionDir.GetItemPointer() here instead. Note that using a different starting point does change the semantics of the path strings... e.g. in that case "*" would mean all children underneath the session node, rather than all children beneath the root node.
Leading slashes in the path strings are NOT handled by the WildPathMatcher--all WildPathMatcher path strings are taken to be relative paths, and are relative to the node passed in as the third argument to DoTraversal(). If you want to be able to handle leading slashes and give a default prefix to relative path strings, you may find the method WildPathMatcher::AdjustStringPrefix() to be useful.
status_t MyReflectServer :: ReadyToRun() { status_t ret = ReflectServer::ReadyToRun(); if (ret != B_NO_ERROR) return ret; FakeSession * fakeSession = newnothrow FakeSession(); if (fakeSession) { // Add with socket = ConstSocketRef(), meaning there is no client for this session. if (AddNewSession(AbstractReflectSessionRef(fakeSession), ConstSocketRef()) == B_NO_ERROR) { return B_NO_ERROR; // success! } } else WARN_OUT_OF_MEMORY; return B_ERROR; }
Once created and added, this session can handle messages from the other sessions, create database nodes, and do just about anything any other session can do (except send messages to its client, as it doesn't have one).
LogTime(MUSCLE_LOG_CRITICALERROR, "The sky is falling! You have %i minutes to live!\n", minutesToLive);
This call would cause a log entry like this to be generated:
[C 11/27 15:03:33] The sky is falling! You have 5 minutes to live!
The letter in the second column indicates the log level, followed by the date and time, and finally the log message. Note that the carriage return should be included in your messages text. This is so that you can assemble a log message from several log calls if you wish. Log() is the same as LogTime(), except that the bracketed level/time/date header isn't printed.
You may, in some cases, wish to do other things with the log message besides printing it to stdout or to a log file. In that case, you can use AddLogCallback() to install your own LogCallback object. This object's Log() method will be called whenever a message is logged, and you can then handle it as you wish.
MUSCLE code doesn't use C++ exceptions. This is because I'm not comfortable with the way exceptions integrate into C++; it's far too easy to get a memory leak or other unexpected result when an exception is thrown. Instead, MUSCLE uses the classic C-style method of handling errors--error codes are returned, and manually propogated up to the code that can handle them. (yes, it's a bit tedious. Yes, I still like it better than C++ exceptions. No whining! ;^))
The standard C++ new operator throws a bad_alloc exception when it cannot allocate the requested amount of memory. To avoid this, MUSCLE code uses the new (nothrow) operator instead. To make things a little more flexible and easier to key in, MUSCLE #defines a synonym, "newnothrow", which means the same thing. The canonical MUSCLE style of dynamically allocating something is this:
Something * thing = newnothrow Something(); if (thing) { DoStuffWith(thing); [...] delete thing; // now that we're done with it } else WARN_OUT_OF_MEMORY;
The WARN_OUT_OF_MEMORY macro simply logs a message like this:
[C 11/27 15:03:33] ERROR--OUT OF MEMORY (MyCode.cpp:396)
So later on, when you are trying to figure out why your code didn't get executed, you will know there was a problem allocating memory.
Another feature that MUSCLE offers (via the MemoryAllocator classes) is automatic memory-usage tracking and out-of-memory callbacks. The memory-usage-tracking allows you to place an upper limit on dynamic memory allocation, either to guarantee that your server won't overtax the machine it is running on, or just to test your out-of-memory handling code. See the code at the beginning of main() in muscled.cpp for an example of how to set up memory tracking/limiting. (Note that malloc/free calls aren't tracked--but you should always use new/delete for all your allocations anyway).
Out-of-memory callbacks let you free up unecessary memory in the event of a memory shortage. For example, MUSCLE uses ObjectPools to reduce the number of memory allocations and deletions that must be done on a regular basis. As such, MUSCLE may have several hundred unused objects lying around, awaiting reuse. When the new operator detects a shortage, however, it calls an out-of-memory callback that knows how to drain these pools, thus freeing up some extra memory and allowing the server a little more breathing room. You can use a AutoCleanupProxyMemoryAllocator and some OutOfMemoryCallback objects to configure this behaviour (again, see the beginning of muscle.cpp's main() function for an example).
-Jeremy