"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "reflector/StorageReflectSession.cpp" between
muscle8.20.zip and muscle8.30.zip

About: MUSCLE (Multi User Server Client Linking Environment) is a messaging server and networking API. The included server program ("muscled") lets its clients message each other, and/or store information in its serverside hierarchical database.

StorageReflectSession.cpp  (muscle8.20):StorageReflectSession.cpp  (muscle8.30)
skipping to change at line 193 skipping to change at line 193
AboutToDetachFromServer() AboutToDetachFromServer()
{ {
Cleanup(); Cleanup();
DumbReflectSession::AboutToDetachFromServer(); DumbReflectSession::AboutToDetachFromServer();
} }
/** Used to pass arguments to the SubscribeRefCallback method */ /** Used to pass arguments to the SubscribeRefCallback method */
class SubscribeRefCallbackArgs class SubscribeRefCallbackArgs
{ {
public: public:
SubscribeRefCallbackArgs(int32 refCountDelta) : _refCountDelta(refCountDelta) {/* empty */} explicit SubscribeRefCallbackArgs(int32 refCountDelta) : _refCountDelta(refCo untDelta) {/* empty */}
int32 GetRefCountDelta() const {return _refCountDelta;} int32 GetRefCountDelta() const {return _refCountDelta;}
private: private:
const int32 _refCountDelta; const int32 _refCountDelta;
}; };
void void
StorageReflectSession :: StorageReflectSession ::
Cleanup() Cleanup()
skipping to change at line 239 skipping to change at line 239
_sharedData->_root.Reset(); // do this first! _sharedData->_root.Reset(); // do this first!
delete _sharedData; delete _sharedData;
} }
else else
{ {
// Remove all of our subscription-marks from neighbor's nodes // Remove all of our subscription-marks from neighbor's nodes
SubscribeRefCallbackArgs srcArgs(-2147483647); // remove all of our su bscriptions no matter how many ref-counts we have SubscribeRefCallbackArgs srcArgs(-2147483647); // remove all of our su bscriptions no matter how many ref-counts we have
(void) _subscriptions.DoTraversal((PathMatchCallback)DoSubscribeRefCall backFunc, this, GetGlobalRoot(), false, &srcArgs); (void) _subscriptions.DoTraversal((PathMatchCallback)DoSubscribeRefCall backFunc, this, GetGlobalRoot(), false, &srcArgs);
// Remove any cached tables that reference our session-ID String, as we know they can no longer be useful to anyone. // Remove any cached tables that reference our session-ID String, as we know they can no longer be useful to anyone.
for (HashtableIterator<uint32, DataNodeSubscribersTableRef> iter(_share _sharedData->_cachedSubscribersTables.DropAllCacheEntriesContainingKey(
dData->_cachedSubscribersTables); iter.HasData(); iter++) GetSessionIDString());
{
if (iter.GetValue()()->GetSubscribers().ContainsKey(GetSessionIDStri
ng())) (void) _sharedData->_cachedSubscribersTables.Remove(iter.GetKey());
}
} }
_sharedData = NULL; _sharedData = NULL;
} }
_nextSubscriptionMessage.Reset(); _nextSubscriptionMessage.Reset();
_nextIndexSubscriptionMessage.Reset(); _nextIndexSubscriptionMessage.Reset();
TCHECKPOINT; TCHECKPOINT;
} }
void void
skipping to change at line 298 skipping to change at line 295
for (HashtableIterator<const String *, AbstractReflectSessionRef> iter(GetSes sions()); iter.HasData(); iter++) for (HashtableIterator<const String *, AbstractReflectSessionRef> iter(GetSes sions()); iter.HasData(); iter++)
{ {
StorageReflectSession * next = dynamic_cast<StorageReflectSession *>(iter. GetValue()()); StorageReflectSession * next = dynamic_cast<StorageReflectSession *>(iter. GetValue()());
if (next) next->NodeCreated(newNode); // always notify; !Self filtering w ill be done elsewhere if (next) next->NodeCreated(newNode); // always notify; !Self filtering w ill be done elsewhere
} }
TCHECKPOINT; TCHECKPOINT;
} }
DataNodeSubscribersTableRef ConstDataNodeSubscribersTableRef
StorageReflectSession :: StorageReflectSession ::
GetDataNodeSubscribersTableFromPool(const DataNodeSubscribersTableRef & curTable , const String & sessionIDString, int32 delta) GetDataNodeSubscribersTableFromPool(const ConstDataNodeSubscribersTableRef & cur Ref, const String & sessionIDString, int32 delta)
{ {
if (delta == 0) return curTable; // nothing to do! if (delta == 0) return curRef; // nothing to do!
else if (delta < 0)
{
// See if we can just set the DataNode back to its default/null/empty-subs
cribers-table state
const uint32 * soleRefCount = ((curTable())&&(curTable()->GetSubscribers()
.GetNumItems() == 1)) ? curTable()->GetSubscribers().Get(sessionIDString) : NULL
;
if ((soleRefCount)&&(*soleRefCount <= (uint32)(-delta))) return DataNodeSu
bscribersTableRef();
}
DataNodeSubscribersTableRef * cachedTable = _sharedData->_cachedSubscribersTa const Hashtable<String, uint32> & curTable = curRef() ? curRef()->GetTable()
bles.Get(DataNodeSubscribersTable::HashCodeAfterModification(curTable()?curTable : GetDefaultObjectForType<Hashtable<String, uint32> >();
()->HashCode():0, sessionIDString, delta)); const uint32 curCount = curTable[sessionIDString];
if (cachedTable) const uint32 newCount = (delta > 0) ? (curCount+delta) : (((int32)curCount>de
{ lta)?(curCount-delta):0);
if (curTable())
{
if (curTable()->IsEqualToAfterModification(*cachedTable->GetItemPointer
(), sessionIDString, delta)) return *cachedTable;
}
else if (delta > 0)
{
const Hashtable<String, uint32> & cachedSubs = cachedTable->GetItemPoin
ter()->GetSubscribers();
const uint32 * soleRefCount = (cachedSubs.GetNumItems() == 1) ? cachedS
ubs.Get(sessionIDString) : NULL;
if ((soleRefCount)&&(*soleRefCount == (uint32)delta)) return *cachedTab
le;
}
}
// If we got here, we didn't have anything in our cache for the requested tab DataNodeSubscribersTablePool & cache = _sharedData->_cachedSubscribersTables;
le, so we'll create a new table and store and return it return (newCount > 0) ? cache.GetWithPut(curRef, sessionIDString, newCount) :
DataNodeSubscribersTableRef newRef(newnothrow DataNodeSubscribersTable(curTab cache.GetWithRemove(curRef, sessionIDString);
le(), sessionIDString, delta));
if ((newRef() == NULL)||(_sharedData->_cachedSubscribersTables.Put(newRef()->
HashCode(), newRef).IsError())) MWARN_OUT_OF_MEMORY;
return newRef;
} }
void void
StorageReflectSession :: StorageReflectSession ::
NodeCreated(DataNode & newNode) NodeCreated(DataNode & newNode)
{ {
newNode._subscribers = GetDataNodeSubscribersTableFromPool(newNode._subscribe rs, GetSessionIDString(), _subscriptions.GetMatchCount(newNode, NULL, 0)); // F ogBugz #14596 newNode._subscribers = GetDataNodeSubscribersTableFromPool(newNode._subscribe rs, GetSessionIDString(), _subscriptions.GetMatchCount(newNode, NULL, 0)); // F ogBugz #14596
} }
void void
skipping to change at line 1170 skipping to change at line 1148
{ {
TCHECKPOINT; TCHECKPOINT;
if (_sessionDir()) if (_sessionDir())
{ {
Queue<DataNodeRef> removeSet; Queue<DataNodeRef> removeSet;
(void) matcher.DoTraversal((PathMatchCallback)RemoveDataCallbackFunc, this , *_sessionDir(), true, &removeSet); (void) matcher.DoTraversal((PathMatchCallback)RemoveDataCallbackFunc, this , *_sessionDir(), true, &removeSet);
for (int i=removeSet.GetNumItems()-1; i>=0; i--) for (int i=removeSet.GetNumItems()-1; i>=0; i--)
{ {
DataNode * next = removeSet[i](); DataNode * next = removeSet[i]();
DataNode * parent = next->GetParent(); if (next)
if ((next)&&(parent)) (void) parent->RemoveChild(next->GetNodeName(), q {
uiet ? NULL : this, true, &_currentNodeCount); DataNode * parent = next->GetParent();
if (parent) (void) parent->RemoveChild(next->GetNodeName(), quiet ?
NULL : this, true, &_currentNodeCount);
}
} }
} }
} }
status_t StorageReflectSession :: RemoveDataNodes(const String & nodePath, const ConstQueryFilterRef & filterRef, bool quiet) status_t StorageReflectSession :: RemoveDataNodes(const String & nodePath, const ConstQueryFilterRef & filterRef, bool quiet)
{ {
TCHECKPOINT; TCHECKPOINT;
NodePathMatcher matcher; NodePathMatcher matcher;
MRETURN_ON_ERROR(matcher.PutPathString(nodePath, filterRef)); MRETURN_ON_ERROR(matcher.PutPathString(nodePath, filterRef));
skipping to change at line 1465 skipping to change at line 1446
return false; return false;
} }
uint32 uint32
StorageReflectSession :: NodePathMatcher :: StorageReflectSession :: NodePathMatcher ::
GetMatchCount(DataNode & node, const Message * optData, int rootDepth) const GetMatchCount(DataNode & node, const Message * optData, int rootDepth) const
{ {
TCHECKPOINT; TCHECKPOINT;
uint32 matchCount = 0; uint32 matchCount = 0;
ConstMessageRef fakeRef(optData, false); DummyConstMessageRef fakeRef(optData);
for (HashtableIterator<String, PathMatcherEntry> iter(GetEntries()); iter.Has Data(); iter++) if (PathMatches(node, fakeRef, iter.GetValue(), rootDepth)) matc hCount++; for (HashtableIterator<String, PathMatcherEntry> iter(GetEntries()); iter.Has Data(); iter++) if (PathMatches(node, fakeRef, iter.GetValue(), rootDepth)) matc hCount++;
return matchCount; return matchCount;
} }
bool bool
StorageReflectSession :: NodePathMatcher :: StorageReflectSession :: NodePathMatcher ::
PathMatches(DataNode & node, ConstMessageRef & optData, const PathMatcherEntry & entry, int rootDepth) const PathMatches(DataNode & node, ConstMessageRef & optData, const PathMatcherEntry & entry, int rootDepth) const
{ {
TCHECKPOINT; TCHECKPOINT;
skipping to change at line 2001 skipping to change at line 1982
for (HashtableIterator<IPAddressAndPort, ReflectSessionFactoryRef> iter(GetFa ctories()); iter.HasData(); iter++) for (HashtableIterator<IPAddressAndPort, ReflectSessionFactoryRef> iter(GetFa ctories()); iter.HasData(); iter++)
{ {
const ReflectSessionFactory & f = *iter.GetValue()(); const ReflectSessionFactory & f = *iter.GetValue()();
printf(" %s [%p] is listening at [%s] (%sid=" UINT32_FORMAT_SPEC ").\n", f.GetTypeName(), &f, iter.GetKey().ToString()(), f.IsReadyToAcceptSessions()?"R eadyToAcceptSessions, ":"", f.GetFactoryID()); printf(" %s [%p] is listening at [%s] (%sid=" UINT32_FORMAT_SPEC ").\n", f.GetTypeName(), &f, iter.GetKey().ToString()(), f.IsReadyToAcceptSessions()?"R eadyToAcceptSessions, ":"", f.GetFactoryID());
} }
} }
void StorageReflectSession :: PrintSessionsInfo() const void StorageReflectSession :: PrintSessionsInfo() const
{ {
const Hashtable<const String *, AbstractReflectSessionRef> & t = GetSessions( ); const Hashtable<const String *, AbstractReflectSessionRef> & t = GetSessions( );
printf("There are " UINT32_FORMAT_SPEC " sessions attached, and " UINT32_FORM AT_SPEC " subscriber-tables cached:\n", t.GetNumItems(), _sharedData->_cachedSub scribersTables.GetNumItems()); printf("There are " UINT32_FORMAT_SPEC " sessions attached, and " UINT32_FORM AT_SPEC " subscriber-tables cached:\n", t.GetNumItems(), _sharedData->_cachedSub scribersTables.GetNumCachedItems());
uint32 totalNumOutMessages = 0, totalNumOutBytes = 0, totalNumNodes = 0, tota lNumNodeBytes = 0; uint32 totalNumOutMessages = 0, totalNumOutBytes = 0, totalNumNodes = 0, tota lNumNodeBytes = 0;
for (HashtableIterator<const String *, AbstractReflectSessionRef> iter(t); it er.HasData(); iter++) for (HashtableIterator<const String *, AbstractReflectSessionRef> iter(t); it er.HasData(); iter++)
{ {
AbstractReflectSession * ars = iter.GetValue()(); AbstractReflectSession * ars = iter.GetValue()();
uint32 numOutMessages = 0, numOutBytes = 0, numNodes = 0, numNodeBytes = 0 ; uint32 numOutMessages = 0, numOutBytes = 0, numNodes = 0, numNodeBytes = 0 ;
const AbstractMessageIOGateway * gw = ars->GetGateway()(); const AbstractMessageIOGateway * gw = ars->GetGateway()();
if (gw) if (gw)
{ {
const Queue<MessageRef> & q = gw->GetOutgoingMessageQueue(); const Queue<MessageRef> & q = gw->GetOutgoingMessageQueue();
numOutMessages = q.GetNumItems(); numOutMessages = q.GetNumItems();
 End of changes. 10 change blocks. 
52 lines changed or deleted 22 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)