"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "common/src/test/java/com/tc/net/core/TCWorkerCommManagerTest.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

TCWorkerCommManagerTest.java  (terracotta-core-5.7.0):TCWorkerCommManagerTest.java  (terracotta-core-5.7.1)
skipping to change at line 58 skipping to change at line 58
import com.tc.net.protocol.transport.TransportHandshakeMessage; import com.tc.net.protocol.transport.TransportHandshakeMessage;
import com.tc.net.protocol.transport.TransportMessageFactoryImpl; import com.tc.net.protocol.transport.TransportMessageFactoryImpl;
import com.tc.net.protocol.transport.TransportNetworkStackHarnessFactory; import com.tc.net.protocol.transport.TransportNetworkStackHarnessFactory;
import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl; import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl;
import com.tc.net.proxy.TCPProxy; import com.tc.net.proxy.TCPProxy;
import com.tc.object.session.NullSessionManager; import com.tc.object.session.NullSessionManager;
import com.tc.properties.TCPropertiesImpl; import com.tc.properties.TCPropertiesImpl;
import com.tc.test.TCTestCase; import com.tc.test.TCTestCase;
import com.tc.util.Assert; import com.tc.util.Assert;
import com.tc.util.CallableWaiter; import com.tc.util.CallableWaiter;
import com.tc.util.PortChooser; import com.tc.util.TCTimeoutException;
import com.tc.util.concurrent.ThreadUtil; import com.tc.util.concurrent.ThreadUtil;
import com.tc.properties.TCPropertiesConsts; import com.tc.properties.TCPropertiesConsts;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.junit.Ignore; import org.junit.Ignore;
import org.terracotta.utilities.test.net.PortManager;
public class TCWorkerCommManagerTest extends TCTestCase { public class TCWorkerCommManagerTest extends TCTestCase {
Logger logger = LoggerFactory.getLogger(TCWorkerCommManager.class); Logger logger = LoggerFactory.getLogger(TCWorkerCommManager.class);
List<ClientMessageTransport> transports = new ArrayList<ClientMessageTransport >(); List<ClientMessageTransport> transports = new ArrayList<ClientMessageTransport >();
private final List<TCConnectionManager> clientConnectionMgrs = Collections.syn chronizedList(new ArrayList<>()); private final List<TCConnectionManager> clientConnectionMgrs = Collections.syn chronizedList(new ArrayList<>());
public TCWorkerCommManagerTest() { public TCWorkerCommManagerTest() {
} }
skipping to change at line 337 skipping to change at line 338
config, config,
new ServerI D(), new ServerI D(),
new Transpo rtHandshakeErrorNullHandler(), new Transpo rtHandshakeErrorNullHandler(),
Collections .<TCMessageType, Class<? extends TCMessage>>emptyMap(), Collections .<TCMessageType, Class<? extends TCMessage>>emptyMap(),
Collections .<TCMessageType, GeneratedMessageFactory>emptyMap()); Collections .<TCMessageType, GeneratedMessageFactory>emptyMap());
NetworkListener listener = commsMgr.createListener(new TCSocketAddress(0), true, NetworkListener listener = commsMgr.createListener(new TCSocketAddress(0), true,
new DefaultConnectionId Factory(), (MessageTransport t)->true); new DefaultConnectionId Factory(), (MessageTransport t)->true);
listener.start(Collections.<ConnectionID>emptySet()); listener.start(Collections.<ConnectionID>emptySet());
int serverPort = listener.getBindPort(); int serverPort = listener.getBindPort();
int proxyPort = new PortChooser().chooseRandomPort(); try {
TCPProxy proxy = new TCPProxy(proxyPort, InetAddress.getByName("localhost" try (PortManager.PortRef portRef = PortManager.getInstance().reservePort
), serverPort, 0, false, null); ()) {
proxy.start(); int proxyPort = portRef.port();
TCPProxy proxy = new TCPProxy(proxyPort, InetAddress.getByName("localh
InetSocketAddress serverAddress = InetSocketAddress.createUnresolved("loca ost"), serverPort, 0, false, null);
lhost", proxyPort); try {
proxy.start();
ClientMessageChannel client1 = createClientMsgCh();
ClientMessageChannel client2 = createClientMsgCh(); InetSocketAddress serverAddress = InetSocketAddress.createUnresolved
ClientMessageChannel client3 = createClientMsgCh(); ("localhost", proxyPort);
client1.open(serverAddress); ClientMessageChannel client1 = createClientMsgCh();
client2.open(serverAddress); ClientMessageChannel client2 = createClientMsgCh();
client3.open(serverAddress); ClientMessageChannel client3 = createClientMsgCh();
waitForConnected(client1, client2, client3); client1.open(serverAddress);
client2.open(serverAddress);
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc client3.open(serverAddress);
Comm()).getWeightForWorkerComm(0));
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc waitForConnected(client1, client2, client3);
Comm()).getWeightForWorkerComm(1));
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
Comm()).getWeightForWorkerComm(2)); getTcComm()).getWeightForWorkerComm(0));
Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
// case 1 : network problems .. both ends getting events getTcComm()).getWeightForWorkerComm(1));
proxy.stop(); Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
getTcComm()).getWeightForWorkerComm(2));
waitForWeight(commsMgr, 0, 0);
waitForWeight(commsMgr, 1, 0); // case 1 : network problems .. both ends getting events
waitForWeight(commsMgr, 2, 0); proxy.stop();
proxy.start(); waitForWeight(commsMgr, 0, 0);
waitForWeight(commsMgr, 1, 0);
waitForConnected(client1, client2, client3); waitForWeight(commsMgr, 2, 0);
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc proxy.start();
Comm()).getWeightForWorkerComm(0));
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc waitForConnected(client1, client2, client3);
Comm()).getWeightForWorkerComm(1));
Assert.assertEquals(1, ((TCCommImpl) commsMgr.getConnectionManager().getTc Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
Comm()).getWeightForWorkerComm(2)); getTcComm()).getWeightForWorkerComm(0));
Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
// case 2: problem with the client side connections .. but server still th getTcComm()).getWeightForWorkerComm(1));
inks clients are connected Assert.assertEquals(1, ((TCCommImpl)commsMgr.getConnectionManager().
proxy.closeClientConnections(true, false); getTcComm()).getWeightForWorkerComm(2));
System.out.println("XXX waiting for clients to reconnect"); // case 2: problem with the client side connections .. but server st
waitForWeight(commsMgr, 0, 1); ill thinks clients are connected
waitForWeight(commsMgr, 1, 1); proxy.closeClientConnections(true, false);
waitForWeight(commsMgr, 2, 1);
System.out.println("XXX waiting for clients to reconnect");
// case 3: connecting three more clients through server ports waitForWeight(commsMgr, 0, 1);
waitForWeight(commsMgr, 1, 1);
ClientMessageChannel client4 = createClientMsgCh(); waitForWeight(commsMgr, 2, 1);
ClientMessageChannel client5 = createClientMsgCh();
ClientMessageChannel client6 = createClientMsgCh(); // case 3: connecting three more clients through server ports
serverAddress = InetSocketAddress.createUnresolved("localhost", serverPort ClientMessageChannel client4 = createClientMsgCh();
); ClientMessageChannel client5 = createClientMsgCh();
ClientMessageChannel client6 = createClientMsgCh();
client4.open(serverAddress);
client5.open(serverAddress); serverAddress = InetSocketAddress.createUnresolved("localhost", serv
client6.open(serverAddress); erPort);
waitForConnected(client4, client5, client6); client4.open(serverAddress);
client5.open(serverAddress);
// Issue #414: This intermittently fails so collect more information rega client6.open(serverAddress);
rding the state of the workers. While the
// test expects them each to have 2 clients, we fail when one of them has waitForConnected(client4, client5, client6);
a different number. I suspect that there
// is a race in how the connections are distributed to the worker threads // Issue #414: This intermittently fails so collect more informatio
meaning that 2 concurrent connection n regarding the state of the workers. While the
// attempts may choose the same worker, not realizing that each of them c // test expects them each to have 2 clients, we fail when one of th
hanges its weight. em has a different number. I suspect that there
int weightFor0 = ((TCCommImpl) commsMgr.getConnectionManager().getTcComm() // is a race in how the connections are distributed to the worker t
).getWeightForWorkerComm(0); hreads meaning that 2 concurrent connection
int weightFor1 = ((TCCommImpl) commsMgr.getConnectionManager().getTcComm() // attempts may choose the same worker, not realizing that each of
).getWeightForWorkerComm(1); them changes its weight.
int weightFor2 = ((TCCommImpl) commsMgr.getConnectionManager().getTcComm() int weightFor0 = ((TCCommImpl)commsMgr.getConnectionManager().getTcC
).getWeightForWorkerComm(2); omm()).getWeightForWorkerComm(0);
System.out.println("Issue #414 debug weights: " + weightFor0 + ", " + weig int weightFor1 = ((TCCommImpl)commsMgr.getConnectionManager().getTcC
htFor1 + ", " + weightFor2); omm()).getWeightForWorkerComm(1);
Assert.assertEquals(6, weightFor0 + weightFor1 + weightFor2); int weightFor2 = ((TCCommImpl)commsMgr.getConnectionManager().getTcC
// distribution may not be even since weighting is best efforts but horrib omm()).getWeightForWorkerComm(2);
ly skewed System.out.println("Issue #414 debug weights: " + weightFor0 + ", "
Assert.assertTrue(0 < weightFor0); + weightFor1 + ", " + weightFor2);
Assert.assertTrue(0 < weightFor1); Assert.assertEquals(6, weightFor0 + weightFor1 + weightFor2);
Assert.assertTrue(0 < weightFor2); // distribution may not be even since weighting is best efforts but
horribly skewed
// case 4: closing all connections from server side Assert.assertTrue(0 < weightFor0);
System.out.println("XXX closing all client connections"); Assert.assertTrue(0 < weightFor1);
commsMgr.getConnectionManager().closeAllConnections(5000); Assert.assertTrue(0 < weightFor2);
// all clients should reconnect and should be distributed fairly among the // case 4: closing all connections from server side
worker comms. System.out.println("XXX closing all client connections");
commsMgr.getConnectionManager().closeAllConnections(5000);
// After connection close and reconnects, the weight balance depends on wh
en comms get the close connection events // all clients should reconnect and should be distributed fairly amo
System.out.println("XXX waiting for all clients reconnect"); ng the worker comms.
waitForTotalWeights(commsMgr, 3, 6);
// After connection close and reconnects, the weight balance depends
// case 5: server detecting long gcs and kicking out the clients on when comms get the close connection events
proxy.setDelay(15 * 1000); System.out.println("XXX waiting for all clients reconnect");
waitForTotalWeights(commsMgr, 3, 6);
System.out.println("XXX waiting for HC to kick out the clients those who c
onnected thru proxy ports"); // case 5: server detecting long gcs and kicking out the clients
waitForTotalWeights(commsMgr, 3, 3); proxy.setDelay(15 * 1000);
proxy.setDelay(0); System.out.println("XXX waiting for HC to kick out the clients those
who connected thru proxy ports");
ThreadUtil.reallySleep(10000); waitForTotalWeights(commsMgr, 3, 3);
System.out.println("XXX server after seeing client long GC will not open r
econnect window for it"); proxy.setDelay(0);
Assert.assertEquals(3, (((TCCommImpl) commsMgr.getConnectionManager().getT
cComm()).getWeightForWorkerComm(0)) ThreadUtil.reallySleep(10000);
+ (((TCCommImpl) commsMgr.getConnectionManager().ge System.out.println("XXX server after seeing client long GC will not
tTcComm()).getWeightForWorkerComm(1)) open reconnect window for it");
+ (((TCCommImpl) commsMgr.getConnectionManager().ge Assert.assertEquals(3, (((TCCommImpl)commsMgr.getConnectionManager()
tTcComm()).getWeightForWorkerComm(2))); .getTcComm()).getWeightForWorkerComm(0))
+ (((TCCommImpl)commsMgr.getConnectionManager().getTcComm()).get
client1.close(); WeightForWorkerComm(1))
client2.close(); + (((TCCommImpl)commsMgr.getConnectionManager().getTcComm()).get
client3.close(); WeightForWorkerComm(2)));
client4.close();
client5.close(); client1.close();
client6.close(); client2.close();
client3.close();
listener.stop(5000); client4.close();
commsMgr.shutdown(); client5.close();
connMgr.shutdown(); client6.close();
} finally {
proxy.stop();
}
}
} finally {
try {
listener.stop(5000);
} catch (TCTimeoutException e) {
// ignored
}
commsMgr.shutdown();
connMgr.shutdown();
}
} }
} }
private static void waitForConnected(final ClientMessageChannel... channels) t hrows Exception { private static void waitForConnected(final ClientMessageChannel... channels) t hrows Exception {
CallableWaiter.waitOnCallable(new Callable<Boolean>() { CallableWaiter.waitOnCallable(new Callable<Boolean>() {
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
for (ClientMessageChannel channel : channels) { for (ClientMessageChannel channel : channels) {
if (!channel.isConnected()) { if (!channel.isConnected()) {
return false; return false;
 End of changes. 3 change blocks. 
132 lines changed or deleted 146 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)