"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l2/src/main/java/com/tc/objectserver/entity/ReplicationSender.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

ReplicationSender.java  (terracotta-core-5.7.0):ReplicationSender.java  (terracotta-core-5.7.1)
skipping to change at line 25 skipping to change at line 25
* The Initial Developer of the Covered Software is * The Initial Developer of the Covered Software is
* Terracotta, Inc., a Software AG company * Terracotta, Inc., a Software AG company
* *
*/ */
package com.tc.objectserver.entity; package com.tc.objectserver.entity;
import com.tc.async.api.Sink; import com.tc.async.api.Sink;
import com.tc.l2.msg.ReplicationMessage; import com.tc.l2.msg.ReplicationMessage;
import com.tc.l2.msg.SyncReplicationActivity; import com.tc.l2.msg.SyncReplicationActivity;
import com.tc.net.NodeID; import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.groups.AbstractGroupMessage; import com.tc.net.groups.AbstractGroupMessage;
import com.tc.net.groups.GroupException; import com.tc.net.groups.GroupException;
import com.tc.net.groups.GroupManager; import com.tc.net.groups.GroupManager;
import com.tc.object.FetchID; import com.tc.object.FetchID;
import com.tc.object.session.SessionID; import com.tc.object.session.SessionID;
import com.tc.objectserver.handler.GroupMessageBatchContext; import com.tc.objectserver.handler.GroupMessageBatchContext;
import com.tc.objectserver.handler.ReplicationSendingAction; import com.tc.objectserver.handler.ReplicationSendingAction;
import com.tc.properties.TCPropertiesImpl; import com.tc.properties.TCPropertiesImpl;
import com.tc.util.Assert; import com.tc.util.Assert;
import java.util.EnumSet; import java.util.EnumSet;
skipping to change at line 72 skipping to change at line 73
public ReplicationSender(Sink<ReplicationSendingAction> outgoing, GroupManager <AbstractGroupMessage> group) { public ReplicationSender(Sink<ReplicationSendingAction> outgoing, GroupManager <AbstractGroupMessage> group) {
this.group = group; this.group = group;
this.outgoing = outgoing; this.outgoing = outgoing;
} }
public void removePassive(SessionID dest) { public void removePassive(SessionID dest) {
filtering.remove(dest); filtering.remove(dest);
} }
public boolean addPassive(NodeID node, SessionID session, Integer execution, S yncReplicationActivity activity) { public boolean addPassive(ServerID node, SessionID session, Integer execution, SyncReplicationActivity activity) {
// Set up the sync state. // Set up the sync state.
SyncState state = createAndRegisterSyncState(node, session, execution); SyncState state = createAndRegisterSyncState(node, session, execution);
// Send the message. // Send the message.
return state.attemptToSend(activity); return state.attemptToSend(activity);
} }
public void replicateMessage(SessionID session, SyncReplicationActivity activi ty, Consumer<Boolean> sentCallback) { public void replicateMessage(SessionID session, SyncReplicationActivity activi ty, Consumer<Boolean> sentCallback) {
if (debugLogging) { if (debugLogging) {
logger.debug("WIRE:" + activity); logger.debug("WIRE:" + activity);
} }
skipping to change at line 103 skipping to change at line 104
})); }));
} else { } else {
logger.info("ignoring replication message no session {} for activity {}", session, activity); logger.info("ignoring replication message no session {} for activity {}", session, activity);
if (sentCallback != null) { if (sentCallback != null) {
sentCallback.accept(false); sentCallback.accept(false);
} }
} }
} }
private SyncState createAndRegisterSyncState(NodeID node, SessionID session, i nt lane) { private SyncState createAndRegisterSyncState(ServerID node, SessionID session, int lane) {
// We can't already have a state for this passive. // We can't already have a state for this passive.
Assert.assertTrue(!node.isNull()); Assert.assertTrue(!node.isNull());
Assert.assertTrue(!filtering.containsKey(session)); Assert.assertTrue(!filtering.containsKey(session));
SyncState state = new SyncState(node, session, lane); SyncState state = new SyncState(node, session, lane);
filtering.put(session, state); filtering.put(session, state);
return state; return state;
} }
private Optional<SyncState> getSyncState(SessionID session, SyncReplicationAct ivity activity) { private Optional<SyncState> getSyncState(SessionID session, SyncReplicationAct ivity activity) {
SyncState state = filtering.get(session); SyncState state = filtering.get(session);
skipping to change at line 139 skipping to change at line 140
// for testing only // for testing only
boolean isSyncOccuring(SessionID origin) { boolean isSyncOccuring(SessionID origin) {
SyncState state = filtering.get(origin); SyncState state = filtering.get(origin);
if (state != null) { if (state != null) {
return state.isSyncOccuring(); return state.isSyncOccuring();
} }
return false; return false;
} }
private class SyncState { private class SyncState {
private final NodeID target;
// liveSet is the total set of entities which we believe have finished synci ng and fully exist on the passive. // liveSet is the total set of entities which we believe have finished synci ng and fully exist on the passive.
private final Set<FetchID> liveFetch = new HashSet<>(); private final Set<FetchID> liveFetch = new HashSet<>();
// syncdID is the set of concurrency keys, of the entity syncingID, which we believe have finished syncing and fully // syncdID is the set of concurrency keys, of the entity syncingID, which we believe have finished syncing and fully
// exist on the passive. // exist on the passive.
private final Set<Integer> syncdID = new HashSet<>(); private final Set<Integer> syncdID = new HashSet<>();
// syncingID is the entity currently being synced to this passive. // syncingID is the entity currently being synced to this passive.
private FetchID syncingFetch = FetchID.NULL_ID; private FetchID syncingFetch = FetchID.NULL_ID;
// syncingConcurrency is the concurrency key we are currently syncing to syn cingID, 0 if none is in progress. // syncingConcurrency is the concurrency key we are currently syncing to syn cingID, 0 if none is in progress.
private int syncingConcurrency = -1; private int syncingConcurrency = -1;
// begun is true when we decide to start syncing to this passive node (trigg ered by SYNC_BEGIN). // begun is true when we decide to start syncing to this passive node (trigg ered by SYNC_BEGIN).
skipping to change at line 161 skipping to change at line 161
// complete is true when we decide that syncing to this node is now complete (triggered by SYNC_END). // complete is true when we decide that syncing to this node is now complete (triggered by SYNC_END).
boolean complete = false; boolean complete = false;
private SyncReplicationActivity.ActivityType lastSeen; private SyncReplicationActivity.ActivityType lastSeen;
private SyncReplicationActivity.ActivityType lastSent; private SyncReplicationActivity.ActivityType lastSent;
private final GroupMessageBatchContext<ReplicationMessage, SyncReplicationAc tivity> batchContext; private final GroupMessageBatchContext<ReplicationMessage, SyncReplicationAc tivity> batchContext;
private final SessionID session; private final SessionID session;
private final int executionLane; private final int executionLane;
public SyncState(NodeID target, SessionID nodeToId, int lane) { public SyncState(ServerID target, SessionID nodeToId, int lane) {
this.target = target;
this.session = nodeToId; this.session = nodeToId;
this.executionLane = lane; this.executionLane = lane;
this.batchContext = new GroupMessageBatchContext<>(ReplicationMessage::cre ateActivityContainer, group, target, maximumBatchSize, idealMessagesInFlight, (n ode)->flushBatch()); this.batchContext = new GroupMessageBatchContext<>(ReplicationMessage::cre ateActivityContainer, group, target, maximumBatchSize, idealMessagesInFlight, (n ode)->flushBatch());
} }
private boolean isSameSession(SessionID session) { private boolean isSameSession(SessionID session) {
return this.session.equals(session); return this.session.equals(session);
} }
skipping to change at line 363 skipping to change at line 362
private void flushBatch() { private void flushBatch() {
outgoing.addToSink(new ReplicationSendingAction(this.executionLane, ()->{ outgoing.addToSink(new ReplicationSendingAction(this.executionLane, ()->{
try { try {
this.batchContext.flushBatch(); this.batchContext.flushBatch();
} catch (GroupException ge) { } catch (GroupException ge) {
logger.warn("error sending message to passive ", ge); logger.warn("error sending message to passive ", ge);
} }
})); }));
} }
} }
} }
 End of changes. 6 change blocks. 
6 lines changed or deleted 4 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)