"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l2/src/test/java/com/tc/net/groups/TCGroupManagerNodeJoinedTest.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

TCGroupManagerNodeJoinedTest.java  (terracotta-core-5.7.0):TCGroupManagerNodeJoinedTest.java  (terracotta-core-5.7.1)
skipping to change at line 21 skipping to change at line 21
* the specific language governing rights and limitations under the License. * the specific language governing rights and limitations under the License.
* *
* The Covered Software is Terracotta Core. * The Covered Software is Terracotta Core.
* *
* The Initial Developer of the Covered Software is * The Initial Developer of the Covered Software is
* Terracotta, Inc., a Software AG company * Terracotta, Inc., a Software AG company
* *
*/ */
package com.tc.net.groups; package com.tc.net.groups;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.terracotta.utilities.test.net.PortManager;
import com.tc.async.api.StageManager;
import com.tc.async.impl.StageManagerImpl;
import com.tc.config.GroupConfiguration; import com.tc.config.GroupConfiguration;
import com.tc.io.TCByteBufferInput; import com.tc.io.TCByteBufferInput;
import com.tc.io.TCByteBufferOutput; import com.tc.io.TCByteBufferOutput;
import com.tc.l2.ha.RandomWeightGenerator; import com.tc.l2.ha.RandomWeightGenerator;
import com.tc.lang.TCThreadGroup; import com.tc.lang.TCThreadGroup;
import com.tc.lang.TestThrowableHandler; import com.tc.lang.TestThrowableHandler;
import com.tc.net.NodeID; import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.protocol.transport.ClientConnectionEstablisher; import com.tc.net.protocol.transport.ClientConnectionEstablisher;
import com.tc.net.protocol.transport.ClientMessageTransport;
import com.tc.net.protocol.transport.NullConnectionPolicy; import com.tc.net.protocol.transport.NullConnectionPolicy;
import com.tc.net.proxy.TCPProxy; import com.tc.net.proxy.TCPProxy;
import com.tc.objectserver.impl.TopologyManager; import com.tc.objectserver.impl.TopologyManager;
import com.tc.properties.TCPropertiesConsts;
import com.tc.properties.TCPropertiesImpl;
import com.tc.test.TCTestCase; import com.tc.test.TCTestCase;
import com.tc.util.Assert; import com.tc.util.Assert;
import com.tc.util.CallableWaiter; import com.tc.util.CallableWaiter;
import com.tc.util.PortChooser;
import com.tc.util.concurrent.NoExceptionLinkedQueue; import com.tc.util.concurrent.NoExceptionLinkedQueue;
import com.tc.util.concurrent.QueueFactory;
import com.tc.util.concurrent.ThreadUtil; import com.tc.util.concurrent.ThreadUtil;
import com.tc.util.runtime.ThreadDumpUtil; import com.tc.util.runtime.ThreadDumpUtil;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TCGroupManagerNodeJoinedTest extends TCTestCase { public class TCGroupManagerNodeJoinedTest extends TCTestCase {
private final static String LOCALHOST = "localhost"; private final static String LOCALHOST = "localhost";
private static final Logger logger = LoggerFactory.getLogger(TCGroupManager NodeJoinedTest.class); private static final Logger logger = LoggerFactory.getLogger(TCGroupManager NodeJoinedTest.class);
private TCThreadGroup threadGroup; private TCThreadGroup threadGroup;
private TCGroupManagerImpl[] groupManagers; private TCGroupManagerImpl[] groupManagers;
private MyListener[] listeners; private MyListener[] listeners;
private TestThrowableHandler throwableHandler; private TestThrowableHandler throwableHandler;
private MockStageManagerFactory stages; private MockStageManagerFactory stages;
skipping to change at line 108 skipping to change at line 102
nodesSetupAndJoined(6); nodesSetupAndJoined(6);
} }
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
private void nodesSetupAndJoined(int nodes) throws Exception { private void nodesSetupAndJoined(int nodes) throws Exception {
System.out.println("*** Testing nodejoined for " + nodes + " servers"); System.out.println("*** Testing nodejoined for " + nodes + " servers");
listeners = new MyListener[nodes]; listeners = new MyListener[nodes];
groupManagers = new TCGroupManagerImpl[nodes]; groupManagers = new TCGroupManagerImpl[nodes];
Node[] allNodes = new Node[nodes];
PortChooser pc = new PortChooser();
for (int i = 0; i < nodes; ++i) {
int port = pc.chooseRandom2Port();
allNodes[i] = new Node(LOCALHOST, port, port + 1);
}
for (int i = 0; i < nodes; ++i) {
TCGroupManagerImpl gm = new TCGroupManagerImpl(new NullConnectionPolicy(),
allNodes[i].getHost(),
allNodes[i].getPort(), allN
odes[i].getGroupPort(),
stages.createStageManager()
,
RandomWeightGenerator.creat
eTestingFactory(2),
mock(TopologyManager.class)
);
gm.setDiscover(new TCGroupMemberDiscoveryStatic(gm, allNodes[i]));
groupManagers[i] = gm;
MyGroupEventListener gel = new MyGroupEventListener(gm);
listeners[i] = new MyListener();
gm.registerForMessages(TestMessage.class, listeners[i]);
gm.registerForGroupEvents(gel);
PortManager portManager = PortManager.getInstance();
List<PortManager.PortRef> ports = portManager.reservePorts(nodes);
List<PortManager.PortRef> groupPorts = portManager.reservePorts(nodes);
try {
Node[] allNodes = new Node[nodes];
for (int i = 0; i < nodes; ++i) {
allNodes[i] = new Node(LOCALHOST, ports.get(i).port(), groupPorts.get(i)
.port());
}
for (int i = 0; i < nodes; ++i) {
TCGroupManagerImpl gm = new TCGroupManagerImpl(new NullConnectionPolicy(
), allNodes[i].getHost(),
allNodes[i].getPort(), al
lNodes[i].getGroupPort(),
stages.createStageManager
(),
RandomWeightGenerator.cre
ateTestingFactory(2),
mock(TopologyManager.clas
s));
gm.setDiscover(new TCGroupMemberDiscoveryStatic(gm, allNodes[i]));
groupManagers[i] = gm;
MyGroupEventListener gel = new MyGroupEventListener(gm);
listeners[i] = new MyListener();
gm.registerForMessages(TestMessage.class, listeners[i]);
gm.registerForGroupEvents(gel);
}
// joining
System.out.println("*** Start Joining...");
for (int i = 0; i < nodes; ++i) {
Set<Node> nodeSet = new HashSet<>();
Collections.addAll(nodeSet, allNodes);
GroupConfiguration groupConfiguration = TCGroupManagerImplTest.getGroupC
onfiguration(nodeSet, allNodes[i]);
groupManagers[i].join(groupConfiguration);
}
waitForAllMessageCountsToReach(nodes - 1);
System.out.println("VERIFIED");
shutdown();
} finally {
ports.forEach(PortManager.PortRef::close);
groupPorts.forEach(PortManager.PortRef::close);
} }
// joining
System.out.println("*** Start Joining...");
for (int i = 0; i < nodes; ++i) {
Set<Node> nodeSet = new HashSet<>();
Collections.addAll(nodeSet, allNodes);
GroupConfiguration groupConfiguration = TCGroupManagerImplTest.getGroupCon
figuration(nodeSet, allNodes[i]);
groupManagers[i].join(groupConfiguration);
}
waitForAllMessageCountsToReach(nodes - 1);
System.out.println("VERIFIED");
shutdown();
} }
public void nodesSetupAndJoinedAfterCloseMember(int nodes) throws Exception { private void nodesSetupAndJoined_DEV3101(int nodes) throws Exception {
System.out.println("XXX Testing DEV-4870 : There is a world after doing clos
eMember()");
Assert.assertEquals(2, nodes);
int count = getThreadCountByName(ClientConnectionEstablisher.RECONNECT_THREA
D_NAME);
System.out.println("XXX Thread count : " + ClientConnectionEstablisher.RECON
NECT_THREAD_NAME + " - " + count);
listeners = new MyListener[nodes];
groupManagers = new TCGroupManagerImpl[nodes];
Node[] allNodes = new Node[nodes];
Node[] proxiedAllNodes = new Node[nodes];
TCPProxy[] proxy = new TCPProxy[nodes];
PortChooser pc = new PortChooser();
for (int i = 0; i < nodes; ++i) {
int port = pc.chooseRandom2Port();
allNodes[i] = new Node(LOCALHOST, port, port + 1);
int proxyPort = pc.chooseRandomPort();
proxy[i] = new TCPProxy(proxyPort, InetAddress.getByName(LOCALHOST), port
+ 1, 0, false, null);
proxy[i].start();
proxiedAllNodes[i] = new Node(LOCALHOST, port, proxyPort);
}
proxy[1].stop();
for (int i = 0; i < nodes; ++i) {
StageManager stageManager = new StageManagerImpl(threadGroup, new QueueFac
tory());
TCGroupManagerImpl gm = new TCGroupManagerImpl(new NullConnectionPolicy(),
allNodes[i].getHost(),
allNodes[i].getPort(), allN
odes[i].getGroupPort(),
stages.createStageManager()
, RandomWeightGenerator.createTestingFactory(2),
mock(TopologyManager.class)
);
gm.setDiscover(new TCGroupMemberDiscoveryStatic(gm, allNodes[i]));
groupManagers[i] = gm;
gm.setZapNodeRequestProcessor(new TCGroupManagerImplTest.MockZapNodeReques
tProcessor());
MyGroupEventListener gel = new MyGroupEventListener(gm);
listeners[i] = new MyListener();
gm.registerForMessages(TestMessage.class, listeners[i]);
gm.registerForGroupEvents(gel);
}
Set<Node> nodeSet = new HashSet<>();
Collections.addAll(nodeSet, proxiedAllNodes);
// joining
System.err.println("XXX Start Joining...");
for (int i = 0; i < nodes; ++i) {
GroupConfiguration groupConfiguration = TCGroupManagerImplTest.getGroupCon
figuration(nodeSet, allNodes[i]);
groupManagers[i].join(groupConfiguration);
}
waitForAllMessageCountsToReach(nodes - 1);
ThreadUtil.reallySleep(5000);
System.err.println("XXX 1st verification done.");
System.err.println("XXX Node 0: " + allNodes[0]);
System.err.println("XXX Node 1: " + allNodes[1]);
groupManagers[0].closeMember((ServerID) groupManagers[1].getLocalNodeID());
System.out.println("XXX member close done");
proxy[0].stop();
proxy[1].start();
waitForAllMessageCountsToReach(nodes);
shutdown();
}
public void nodesSetupAndJoined_DEV3101(int nodes) throws Exception {
System.out.println("*** Testing DEV3101 1"); System.out.println("*** Testing DEV3101 1");
Assert.assertEquals(2, nodes); Assert.assertEquals(2, nodes);
int count = getThreadCountByName(ClientConnectionEstablisher.RECONNECT_THREA D_NAME); int count = getThreadCountByName(ClientConnectionEstablisher.RECONNECT_THREA D_NAME);
System.out.println("XXX Thread count : " + ClientConnectionEstablisher.RECON NECT_THREAD_NAME + " - " + count); System.out.println("XXX Thread count : " + ClientConnectionEstablisher.RECON NECT_THREAD_NAME + " - " + count);
listeners = new MyListener[nodes]; listeners = new MyListener[nodes];
groupManagers = new TCGroupManagerImpl[nodes]; groupManagers = new TCGroupManagerImpl[nodes];
Node[] allNodes = new Node[nodes];
Node[] proxiedAllNodes = new Node[nodes];
TCPProxy[] proxy = new TCPProxy[nodes]; TCPProxy[] proxy = new TCPProxy[nodes];
PortChooser pc = new PortChooser();
for (int i = 0; i < nodes; ++i) {
int port = pc.chooseRandom2Port();
allNodes[i] = new Node(LOCALHOST, port, port + 1);
int proxyPort = pc.chooseRandomPort();
proxy[i] = new TCPProxy(proxyPort, InetAddress.getByName(LOCALHOST), port
+ 1, 0, false, null);
proxy[i].start();
proxiedAllNodes[i] = new Node(LOCALHOST, port, proxyPort);
}
for (int i = 0; i < nodes; ++i) { PortManager portManager = PortManager.getInstance();
TCGroupManagerImpl gm = new TCGroupManagerImpl(new NullConnectionPolicy(), List<PortManager.PortRef> ports = portManager.reservePorts(nodes);
allNodes[i].getHost(), List<PortManager.PortRef> groupPorts = portManager.reservePorts(nodes);
allNodes[i].getPort(), allN List<PortManager.PortRef> proxyPorts = portManager.reservePorts(nodes);
odes[i].getGroupPort(), try {
stages.createStageManager() Node[] allNodes = new Node[nodes];
, RandomWeightGenerator.createTestingFactory(2), Node[] proxiedAllNodes = new Node[nodes];
mock(TopologyManager.class) for (int i = 0; i < nodes; ++i) {
); int port = ports.get(i).port();
gm.setDiscover(new TCGroupMemberDiscoveryStatic(gm, allNodes[i])); int groupPort = groupPorts.get(i).port();
int proxyPort = proxyPorts.get(i).port();
groupManagers[i] = gm; allNodes[i] = new Node(LOCALHOST, port, groupPort);
gm.setZapNodeRequestProcessor(new TCGroupManagerImplTest.MockZapNodeReques proxy[i] = new TCPProxy(proxyPort, InetAddress.getByName(LOCALHOST), gro
tProcessor()); upPort, 0, false, null);
MyGroupEventListener gel = new MyGroupEventListener(gm); proxy[i].start();
listeners[i] = new MyListener(); proxiedAllNodes[i] = new Node(LOCALHOST, port, proxyPort);
gm.registerForMessages(TestMessage.class, listeners[i]); }
gm.registerForGroupEvents(gel);
for (int i = 0; i < nodes; ++i) {
TCGroupManagerImpl gm = new TCGroupManagerImpl(new NullConnectionPolicy(
), allNodes[i].getHost(),
allNodes[i].getPort(), al
lNodes[i].getGroupPort(),
stages.createStageManager
(), RandomWeightGenerator.createTestingFactory(2),
mock(TopologyManager.clas
s));
gm.setDiscover(new TCGroupMemberDiscoveryStatic(gm, allNodes[i]));
groupManagers[i] = gm;
gm.setZapNodeRequestProcessor(new TCGroupManagerImplTest.MockZapNodeRequ
estProcessor());
MyGroupEventListener gel = new MyGroupEventListener(gm);
listeners[i] = new MyListener();
gm.registerForMessages(TestMessage.class, listeners[i]);
gm.registerForGroupEvents(gel);
}
} // joining
System.out.println("*** Start Joining...");
Set<Node> nodeSet = new HashSet<>();
Collections.addAll(nodeSet, proxiedAllNodes);
// joining for (int i = 0; i < nodes; ++i) {
System.out.println("*** Start Joining..."); GroupConfiguration groupConfiguration = TCGroupManagerImplTest.getGroupC
Set<Node> nodeSet = new HashSet<>(); onfiguration(nodeSet, allNodes[i]);
Collections.addAll(nodeSet, proxiedAllNodes); groupManagers[i].join(groupConfiguration);
}
for (int i = 0; i < nodes; ++i) {
GroupConfiguration groupConfiguration = TCGroupManagerImplTest.getGroupCon
figuration(nodeSet, allNodes[i]);
groupManagers[i].join(groupConfiguration);
}
waitForAllMessageCountsToReach(nodes - 1); waitForAllMessageCountsToReach(nodes - 1);
System.out.println("XXX 1st verification done"); System.out.println("XXX 1st verification done");
for (int i = 0; i < nodes; ++i) { for (int i = 0; i < nodes; ++i) {
proxy[i].stop(); proxy[i].stop();
} }
System.out.println("XXX Node 0 Zapped Node 1"); System.out.println("XXX Node 0 Zapped Node 1");
groupManagers[0].addZappedNode(groupManagers[1].getLocalNodeID()); groupManagers[0].addZappedNode(groupManagers[1].getLocalNodeID());
System.out.println("XXX proxy stopped"); System.out.println("XXX proxy stopped");
ThreadUtil.reallySleep(5000); ThreadUtil.reallySleep(5000);
for (int i = 0; i < nodes; ++i) { for (int i = 0; i < nodes; ++i) {
proxy[i].start(); proxy[i].start();
} }
System.out.println("XXX proxy resumed. Grp Mgrs discovery started for 20 sec System.out.println("XXX proxy resumed. Grp Mgrs discovery started for 20 s
onds"); econds");
// let the restores/reconnects along with the zapping problem happen for 20 // let the restores/reconnects along with the zapping problem happen for 2
seconds 0 seconds
ThreadUtil.reallySleep(20000); ThreadUtil.reallySleep(20000);
System.out.println("XXX STOPPING Grp Mgrs discovery"); System.out.println("XXX STOPPING Grp Mgrs discovery");
for (int i = 0; i < nodes; ++i) { for (int i = 0; i < nodes; ++i) {
groupManagers[i].getDiscover().stop(Integer.MAX_VALUE); groupManagers[i].getDiscover().stop(Integer.MAX_VALUE);
} }
System.out.println("XXX Waiting for all restore connection close"); System.out.println("XXX Waiting for all restore connection close");
ensureThreadAbsent(ClientConnectionEstablisher.RECONNECT_THREAD_NAME, count) ensureThreadAbsent(ClientConnectionEstablisher.RECONNECT_THREAD_NAME, coun
; t);
shutdown(); shutdown();
} finally {
Arrays.stream(proxy).filter(Objects::nonNull).forEach(TCPProxy::stop);
proxyPorts.forEach(PortManager.PortRef::close);
ports.forEach(PortManager.PortRef::close);
groupPorts.forEach(PortManager.PortRef::close);
}
} }
private void ensureThreadAbsent(String absentThreadName, int allowedLimit) { private void ensureThreadAbsent(String absentThreadName, int allowedLimit) {
Thread[] allThreads = ThreadDumpUtil.getAllThreads(); Thread[] allThreads = ThreadDumpUtil.getAllThreads();
int count = 0; int count = 0;
for (Thread t : allThreads) { for (Thread t : allThreads) {
if (t.isAlive() && t.getName().contains(absentThreadName)) { if (t.isAlive() && t.getName().contains(absentThreadName)) {
// one more chance to wait for death // one more chance to wait for death
try { try {
// mutliple tests can be running in the same JVM so this is kind of bogus. // mutliple tests can be running in the same JVM so this is kind of bogus.
 End of changes. 30 change blocks. 
199 lines changed or deleted 129 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)