"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l2/src/main/java/com/tc/objectserver/entity/ActiveToPassiveReplication.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

ActiveToPassiveReplication.java  (terracotta-core-5.7.0):ActiveToPassiveReplication.java  (terracotta-core-5.7.1)
skipping to change at line 52 skipping to change at line 52
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer; import java.util.function.Consumer;
import com.tc.l2.state.ConsistencyManager; import com.tc.l2.state.ConsistencyManager;
import com.tc.l2.state.ServerMode; import com.tc.l2.state.ServerMode;
import com.tc.net.ServerID;
import com.tc.object.session.SessionID; import com.tc.object.session.SessionID;
import com.tc.objectserver.handler.ReplicationReceivingAction; import com.tc.objectserver.handler.ReplicationReceivingAction;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** /**
* This class acts to connect {@link ProcessTransactionHandler} to the {@link R eplicationSender} * This class acts to connect {@link ProcessTransactionHandler} to the {@link R eplicationSender}
* *
* This class lies idle until activated by setting the current passive nodes. This should * This class lies idle until activated by setting the current passive nodes. This should
* occur only when the server is transitioning from passive-standby to active * occur only when the server is transitioning from passive-standby to active
*/ */
public class ActiveToPassiveReplication implements PassiveReplicationBroker, Gro upEventsListener { public class ActiveToPassiveReplication implements PassiveReplicationBroker, Gro upEventsListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ActiveToPassiveRe plication.class); private static final Logger LOGGER = LoggerFactory.getLogger(ActiveToPassiveRe plication.class);
private boolean activated = false; private boolean activated = false;
private final Map<NodeID, SessionID> passiveNodes = new ConcurrentHashMap<>(); private final Map<ServerID, SessionID> passiveNodes = new ConcurrentHashMap<>( );
private final Set<NodeID> standByNodes = new HashSet<>(); private final Set<NodeID> standByNodes = new HashSet<>();
private final ConcurrentHashMap<SyncReplicationActivity.ActivityID, ActivePass iveAckWaiter> waiters = new ConcurrentHashMap<>(); private final ConcurrentHashMap<SyncReplicationActivity.ActivityID, ActivePass iveAckWaiter> waiters = new ConcurrentHashMap<>();
private final ReplicationSender replicationSender; private final ReplicationSender replicationSender;
private final ExecutorService passiveSyncPool = Executors.newCachedThreadPool( ); private final ExecutorService passiveSyncPool = Executors.newCachedThreadPool( );
private final EntityPersistor persistor; private final EntityPersistor persistor;
private final GroupManager serverCheck; private final GroupManager serverCheck;
private final ProcessTransactionHandler snapshotter; private final ProcessTransactionHandler snapshotter;
private final ConsistencyManager consistencyMgr; private final ConsistencyManager consistencyMgr;
private final Sink<ReplicationReceivingAction> receiveHandler; private final Sink<ReplicationReceivingAction> receiveHandler;
private final AtomicLong sessionMaker = new AtomicLong(); private final AtomicLong sessionMaker = new AtomicLong();
private final Map<NodeID, Integer> lane = new ConcurrentHashMap<>();
public ActiveToPassiveReplication(ConsistencyManager consistencyMgr, ProcessTr ansactionHandler snapshotter, EntityPersistor persistor, ReplicationSender repli cationSender, Sink<ReplicationReceivingAction> processor, GroupManager serverMat ch) { public ActiveToPassiveReplication(ConsistencyManager consistencyMgr, ProcessTr ansactionHandler snapshotter, EntityPersistor persistor, ReplicationSender repli cationSender, Sink<ReplicationReceivingAction> processor, GroupManager serverMat ch) {
this.consistencyMgr = consistencyMgr; this.consistencyMgr = consistencyMgr;
this.replicationSender = replicationSender; this.replicationSender = replicationSender;
this.persistor = persistor; this.persistor = persistor;
this.serverCheck = serverMatch; this.serverCheck = serverMatch;
this.snapshotter = snapshotter; this.snapshotter = snapshotter;
this.receiveHandler = processor; this.receiveHandler = processor;
} }
skipping to change at line 111 skipping to change at line 113
} }
while (this.standByNodes.contains(node)) { while (this.standByNodes.contains(node)) {
this.standByNodes.wait(); this.standByNodes.wait();
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
} }
public void enterActiveState(Set<NodeID> passives) { public void enterActiveState(Set<ServerID> passives) {
Assert.assertFalse(activated); Assert.assertFalse(activated);
primePassives(passives); primePassives(passives);
activated = true; activated = true;
} }
/** /**
* starts the stream of messages to each passive the server knows about. This s hould only happen * starts the stream of messages to each passive the server knows about. This s hould only happen
* when a server enters active state. * when a server enters active state.
*/ */
private void primePassives(Set<NodeID> passives) { private void primePassives(Set<ServerID> passives) {
passives.forEach(i -> { passives.forEach(i -> {
SessionID session = prime(i); SessionID session = prime(i);
if (session == null) { if (session.isNull()) {
LOGGER.warn("add passive disallowed for " + i); LOGGER.warn("add passive disallowed for " + i);
} else { } else {
Assert.assertNull(passiveNodes.put(i, session)); passiveNodes.putIfAbsent(i, session);
} }
}); });
} }
/** /**
* prime the message channel to a node by setting the starting ordering id to ze ro. * prime the message channel to a node by setting the starting ordering id to ze ro.
*/ */
private SessionID prime(NodeID node) { private SessionID prime(ServerID node) {
Assert.assertFalse(node.isNull()); Assert.assertFalse(node.isNull());
SessionID current = passiveNodes.get(node); SessionID current = passiveNodes.getOrDefault(node, SessionID.NULL_ID);
if (current == null) { if (standByNodes.contains(node) && serverCheck.isNodeConnected(node)) {
if (!consistencyMgr.requestTransition(ServerMode.ACTIVE, node, Consistency Manager.Transition.ADD_PASSIVE)) { if (!consistencyMgr.requestTransition(ServerMode.ACTIVE, node, Consistency Manager.Transition.ADD_PASSIVE)) {
serverCheck.zapNode(node, L2HAZapNodeRequestProcessor.SPLIT_BRAIN, "unab le to verify active"); serverCheck.zapNode(node, L2HAZapNodeRequestProcessor.SPLIT_BRAIN, "unab le to verify active");
return null; return SessionID.NULL_ID;
} else { } else {
LOGGER.debug("Starting message sequence on " + node); if (serverCheck.isNodeConnected(node)) {
SessionID newSession = new SessionID(sessionMaker.incrementAndGet()); LOGGER.debug("Starting message sequence on " + node);
// this execution lane is important to make sure each passive has it's SessionID newSession = new SessionID(sessionMaker.incrementAndGet());
own
// set of network threads for operations boolean sent = this.replicationSender.addPassive(node, newSession, exe
Integer executionLane = lane.computeIfAbsent(node, n->lane.size()); cutionLane(newSession), SyncReplicationActivity.createStartMessage());
boolean sent = this.replicationSender.addPassive(node, newSession, execu Assert.assertTrue(sent);
tionLane, SyncReplicationActivity.createStartMessage()); return newSession;
Assert.assertTrue(sent); }
return newSession;
} }
} }
return current; return current;
} }
public void startPassiveSync(NodeID newNode) { private static int executionLane(SessionID session) {
return Long.hashCode(session.toLong());
}
public void startPassiveSync(ServerID newNode) {
Assert.assertTrue(activated); Assert.assertTrue(activated);
SessionID session = prime(newNode); SessionID session = prime(newNode);
if (session != null) { if (session.isValid()) {
SessionID previous = passiveNodes.put(newNode, session); if (passiveNodes.putIfAbsent(newNode, session) == null) {
if (previous != null) { LOGGER.info("Starting sync to node: {} session: {}", newNode, session);
removePassiveSession(previous); executePassiveSync(newNode, session);
} }
} else { } else {
Assert.assertTrue("passive node unable to prime and not in the list of pas if (!passiveNodes.containsKey(newNode)) {
sives", passiveNodes.containsKey(newNode)); LOGGER.info("passive node {} to requesting prime is no longer a valid pa
ssive", newNode);
} else {
LOGGER.info("unable to prime connection to {} for passive sync", newNode
);
serverCheck.closeMember(newNode);
}
} }
LOGGER.info("Starting sync to " + newNode);
executePassiveSync(newNode, session);
} }
/** /**
* Using an executor service here to sync multiple passives at once * Using an executor service here to sync multiple passives at once
* @param newNode * @param newNode
*/ */
private void executePassiveSync(final NodeID newNode, SessionID session) { private void executePassiveSync(final NodeID newNode, SessionID session) {
passiveSyncPool.execute(() -> { passiveSyncPool.execute(() -> {
// start passive sync message // start passive sync message
LOGGER.debug("starting sync for " + newNode + " on session " + session); LOGGER.debug("starting sync for " + newNode + " on session " + session);
Iterable<ManagedEntity> e = snapshotter.snapshotEntityList(new Consumer<Li st<ManagedEntity>>() { Iterable<ManagedEntity> e = snapshotter.snapshotEntityList(new Consumer<Li st<ManagedEntity>>() {
skipping to change at line 220 skipping to change at line 229
persistor.serialize(data); persistor.serialize(data);
data.close(); data.close();
return out.toByteArray(); return out.toByteArray();
} catch (IOException ioe) { } catch (IOException ioe) {
} }
return null; return null;
} }
public void batchAckReceived(ReplicationMessageAck context) { public void batchAckReceived(ReplicationMessageAck context) {
NodeID messageFrom = context.messageFrom(); ServerID messageFrom = context.messageFrom();
this.receiveHandler.addToSink(new ReplicationReceivingAction(lane.get(messag SessionID session = this.passiveNodes.getOrDefault(messageFrom, SessionID.NU
eFrom), ()->{ LL_ID);
for (ReplicationAckTuple tuple : context.getBatch()) { if (session.isValid()) {
if (ReplicationResultCode.RECEIVED == tuple.result) { this.receiveHandler.addToSink(new ReplicationReceivingAction(executionLane
ActivePassiveAckWaiter waiter = waiters.get(tuple.respondTo); (session), ()->{
if (null != waiter) { for (ReplicationAckTuple tuple : context.getBatch()) {
waiter.didReceiveOnPassive(messageFrom); if (ReplicationResultCode.RECEIVED == tuple.result) {
ActivePassiveAckWaiter waiter = waiters.get(tuple.respondTo);
if (null != waiter) {
waiter.didReceiveOnPassive(messageFrom);
}
} else {
// This is a normal completion.
internalAckCompleted(tuple.respondTo, messageFrom, tuple.result);
} }
} else {
// This is a normal completion.
internalAckCompleted(tuple.respondTo, messageFrom, tuple.result);
} }
} }));
})); }
} }
/** /**
* This internal handling for completed is split out since it happens for both completed acks but also situations which * This internal handling for completed is split out since it happens for both completed acks but also situations which
* implies no ack is forthcoming (the passive disappearing, for example). * implies no ack is forthcoming (the passive disappearing, for example).
*/ */
private void internalAckCompleted(SyncReplicationActivity.ActivityID activityI D, NodeID passive, ReplicationResultCode payload) { private void internalAckCompleted(SyncReplicationActivity.ActivityID activityI D, ServerID passive, ReplicationResultCode payload) {
ActivePassiveAckWaiter waiter = waiters.get(activityID); ActivePassiveAckWaiter waiter = waiters.get(activityID);
if (null != waiter) { if (null != waiter) {
boolean shouldDiscardWaiter = waiter.didCompleteOnPassive(passive, payload ); boolean shouldDiscardWaiter = waiter.didCompleteOnPassive(passive, payload );
if (shouldDiscardWaiter) { if (shouldDiscardWaiter) {
waiters.remove(activityID); waiters.remove(activityID);
} }
} }
} }
@Override @Override
public Set<SessionID> passives() { public Set<SessionID> passives() {
List<SessionID> copy = new ArrayList<>(passiveNodes.values()); Collection<SessionID> copy = passiveNodes.values().stream().filter(SessionID ::isValid).collect(Collectors.toCollection(()->new ArrayList<>(passiveNodes.size ())));
return new AbstractSet<SessionID>() { return new AbstractSet<SessionID>() {
@Override @Override
public Iterator<SessionID> iterator() { public Iterator<SessionID> iterator() {
return copy.iterator(); return copy.iterator();
} }
@Override @Override
public int size() { public int size() {
return copy.size(); return copy.size();
} }
skipping to change at line 278 skipping to change at line 290
ActivePassiveAckWaiter waiter = new ActivePassiveAckWaiter(this.passiveNodes , all, this); ActivePassiveAckWaiter waiter = new ActivePassiveAckWaiter(this.passiveNodes , all, this);
if (!all.isEmpty()) { if (!all.isEmpty()) {
SyncReplicationActivity.ActivityID activityID = activity.getActivityID(); SyncReplicationActivity.ActivityID activityID = activity.getActivityID();
waiters.put(activityID, waiter); waiters.put(activityID, waiter);
// Note that we want to explicitly create the ReplicationEnvelope using a different helper if it is a local flush // Note that we want to explicitly create the ReplicationEnvelope using a different helper if it is a local flush
// command. // command.
boolean isLocalFlush = (SyncReplicationActivity.ActivityType.FLUSH_LOCAL_P IPELINE == activity.getActivityType()); boolean isLocalFlush = (SyncReplicationActivity.ActivityType.FLUSH_LOCAL_P IPELINE == activity.getActivityType());
for (SessionID node : all) { for (SessionID node : all) {
if (!isLocalFlush) { if (!isLocalFlush) {
// This isn't local-only so try to replicate. // This isn't local-only so try to replicate.
this.replicationSender.replicateMessage(node, activity, sent->{ if (node.isValid()) {
if (!sent) { this.replicationSender.replicateMessage(node, activity, sent->{
boolean complete = waiter.failedToSendToPassive(node); if (!sent) {
if (complete) { boolean complete = waiter.failedToSendToPassive(node);
waiters.remove(activityID); if (complete) {
waiters.remove(activityID);
}
} }
} });
}); }
} }
} }
} }
return waiter; return waiter;
} }
private void removePassive(NodeID nodeID) { private void removePassive(NodeID nodeID) {
SessionID session = this.passiveNodes.get(nodeID); SessionID session = this.passiveNodes.putIfAbsent((ServerID)nodeID, SessionI
D.NULL_ID);
LOGGER.info("removing passive: {} with session: {}", nodeID, session);
passiveSyncPool.execute(()->{ passiveSyncPool.execute(()->{
while (!consistencyMgr.requestTransition(ServerMode.ACTIVE, nodeID, Consis tencyManager.Transition.REMOVE_PASSIVE)) { while (!consistencyMgr.requestTransition(ServerMode.ACTIVE, nodeID, Consis tencyManager.Transition.REMOVE_PASSIVE)) {
try { try {
TimeUnit.SECONDS.sleep(2); TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOGGER.info("interrupted while waiting for permission to remove node") ; LOGGER.info("interrupted while waiting for permission to remove node") ;
} }
} }
// first remove it from the list of passive nodes so that anything sending new m essages // first remove it from the list of passive nodes so that anything sending new m essages
// will have to remove it from the list of nodes to send to // will have to remove it from the list of nodes to send to
// acknowledge all the messages for this node because it is gone, this may resu lt in // acknowledge all the messages for this node because it is gone, this may resu lt in
// a double ack locally but that is ok. acknowledge is loose and can tolerate it. // a double ack locally but that is ok. acknowledge is loose and can tolerate it.
// remove the passive node from the sender first. nothing else is going out // remove the passive node from the sender first. nothing else is going out
if (passiveNodes.remove(nodeID, session)) { if (passiveNodes.remove(nodeID, session)) {
removePassiveSession(session); removePassiveSession(session);
LOGGER.info("removing passive: {}", nodeID); LOGGER.info("removed passive: {} with session {}", nodeID, session);
} else {
Assert.assertTrue(this.passiveNodes.remove(nodeID, SessionID.NULL_ID));
} }
}); });
} }
private void removePassiveSession(SessionID session) { private void removePassiveSession(SessionID session) {
this.replicationSender.removePassive(session); this.replicationSender.removePassive(session);
Iterator<Map.Entry<SyncReplicationActivity.ActivityID, ActivePassiveAckWaite r>> scan = waiters.entrySet().iterator(); Iterator<Map.Entry<SyncReplicationActivity.ActivityID, ActivePassiveAckWaite r>> scan = waiters.entrySet().iterator();
while (scan.hasNext()) { while (scan.hasNext()) {
Map.Entry<SyncReplicationActivity.ActivityID, ActivePassiveAckWaiter> e = scan.next(); Map.Entry<SyncReplicationActivity.ActivityID, ActivePassiveAckWaiter> e = scan.next();
if (e.getValue().failedToSendToPassive(session)) { if (e.getValue().failedToSendToPassive(session)) {
skipping to change at line 338 skipping to change at line 355
// standby nodes for tracking only. no practical use // standby nodes for tracking only. no practical use
synchronized(standByNodes) { synchronized(standByNodes) {
standByNodes.add(nodeID); standByNodes.add(nodeID);
} }
} }
@Override @Override
public void nodeLeft(NodeID nodeID) { public void nodeLeft(NodeID nodeID) {
if (activated) { if (activated) {
removePassive(nodeID); removePassive(nodeID);
} else {
LOGGER.info("not activated, no passives to remove: {}", nodeID);
} }
// standby nodes for tracking only. no practical use // standby nodes for tracking only. no practical use
synchronized(standByNodes) { synchronized(standByNodes) {
standByNodes.remove(nodeID); standByNodes.remove(nodeID);
standByNodes.notifyAll(); standByNodes.notifyAll();
} }
} }
// for test // for test
Map<SyncReplicationActivity.ActivityID, ActivePassiveAckWaiter> getWaiters() { Map<SyncReplicationActivity.ActivityID, ActivePassiveAckWaiter> getWaiters() {
return waiters; return waiters;
 End of changes. 27 change blocks. 
53 lines changed or deleted 74 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)