"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l2/src/test/java/com/tc/l2/state/StateManagerImplTest.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

StateManagerImplTest.java  (terracotta-core-5.7.0):StateManagerImplTest.java  (terracotta-core-5.7.1)
skipping to change at line 49 skipping to change at line 49
import com.tc.net.groups.GroupResponse; import com.tc.net.groups.GroupResponse;
import com.tc.net.groups.Node; import com.tc.net.groups.Node;
import com.tc.net.groups.TCGroupManagerImpl; import com.tc.net.groups.TCGroupManagerImpl;
import com.tc.net.groups.TCGroupMemberDiscoveryStatic; import com.tc.net.groups.TCGroupMemberDiscoveryStatic;
import com.tc.net.protocol.transport.NullConnectionPolicy; import com.tc.net.protocol.transport.NullConnectionPolicy;
import com.tc.objectserver.core.api.ServerConfigurationContext; import com.tc.objectserver.core.api.ServerConfigurationContext;
import com.tc.objectserver.core.impl.ServerConfigurationContextImpl; import com.tc.objectserver.core.impl.ServerConfigurationContextImpl;
import com.tc.objectserver.impl.TopologyManager; import com.tc.objectserver.impl.TopologyManager;
import com.tc.objectserver.persistence.ClusterStatePersistor; import com.tc.objectserver.persistence.ClusterStatePersistor;
import com.tc.server.TCServer; import com.tc.server.TCServer;
import com.tc.util.PortChooser;
import com.tc.util.State; import com.tc.util.State;
import com.tc.util.concurrent.QueueFactory; import com.tc.util.concurrent.QueueFactory;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.tc.l2.state.StateManager.ACTIVE_COORDINATOR; import static com.tc.l2.state.StateManager.ACTIVE_COORDINATOR;
import static com.tc.l2.state.StateManager.PASSIVE_UNINITIALIZED; import static com.tc.l2.state.StateManager.PASSIVE_UNINITIALIZED;
import static com.tc.l2.state.StateManager.START_STATE; import static com.tc.l2.state.StateManager.START_STATE;
import com.tc.net.ServerID;
import com.tc.objectserver.core.impl.ManagementTopologyEventCollector; import com.tc.objectserver.core.impl.ManagementTopologyEventCollector;
import com.tc.server.TCServerMain; import com.tc.server.TCServerMain;
import com.tc.util.concurrent.ThreadUtil;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.junit.Ignore; import org.junit.Ignore;
import org.terracotta.utilities.test.net.PortManager;
import static java.util.stream.Collectors.toSet;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class StateManagerImplTest { public class StateManagerImplTest {
private static final int NUM_OF_SERVERS = 3; private static final int NUM_OF_SERVERS = 3;
private static final String LOCALHOST = "localhost"; private static final String LOCALHOST = "localhost";
private final int[] ports = new int[NUM_OF_SERVERS]; private List<PortManager.PortRef> ports;
private final int[] groupPorts = new int[NUM_OF_SERVERS]; private List<PortManager.PortRef> groupPorts;
private final Node[] nodes = new Node[NUM_OF_SERVERS]; private final Node[] nodes = new Node[NUM_OF_SERVERS];
private final TCGroupManagerImpl[] groupManagers = new TCGroupManagerImpl[NUM_ OF_SERVERS]; private final TCGroupManagerImpl[] groupManagers = new TCGroupManagerImpl[NUM_ OF_SERVERS];
private final Set<Node> nodeSet = new HashSet<>(); private final Set<Node> nodeSet = new HashSet<>();
private TopologyManager topologyManager; private TopologyManager topologyManager;
// private final TCServer[] tcServers = new TCServer[NUM_OF_SERVERS]; // private final TCServer[] tcServers = new TCServer[NUM_OF_SERVERS];
private final StageController[] stageControllers = new StageController[NUM_OF_ SERVERS]; private final StageController[] stageControllers = new StageController[NUM_OF_ SERVERS];
private final ManagementTopologyEventCollector[] mgmt = new ManagementTopology EventCollector[NUM_OF_SERVERS]; private final ManagementTopologyEventCollector[] mgmt = new ManagementTopology EventCollector[NUM_OF_SERVERS];
private final StateManagerImpl[] stateManagers = new StateManagerImpl[NUM_OF_S ERVERS]; private final StateManagerImpl[] stateManagers = new StateManagerImpl[NUM_OF_S ERVERS];
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Logger tcLogger = mock(Logger.class); Logger tcLogger = mock(Logger.class);
PortChooser pc = new PortChooser();
WeightGeneratorFactory weightGeneratorFactory = RandomWeightGenerator.create TestingFactory(2); WeightGeneratorFactory weightGeneratorFactory = RandomWeightGenerator.create TestingFactory(2);
StageManager[] stageManagers = new StageManager[NUM_OF_SERVERS]; StageManager[] stageManagers = new StageManager[NUM_OF_SERVERS];
ConsistencyManager mgr = mock(ConsistencyManager.class); ConsistencyManager mgr = mock(ConsistencyManager.class);
when(mgr.requestTransition(any(ServerMode.class), any(NodeID.class), any(Tra nsition.class))).thenReturn(Boolean.TRUE); when(mgr.requestTransition(any(ServerMode.class), any(NodeID.class), any(Tra nsition.class))).thenReturn(Boolean.TRUE);
when(mgr.requestTransition(any(ServerMode.class), any(NodeID.class), any(), any(Transition.class))).thenReturn(Boolean.TRUE); when(mgr.requestTransition(any(ServerMode.class), any(NodeID.class), any(), any(Transition.class))).thenReturn(Boolean.TRUE);
when(mgr.createVerificationEnrollment(any(NodeID.class), any(WeightGenerator Factory.class))).then(i->{ when(mgr.createVerificationEnrollment(any(NodeID.class), any(WeightGenerator Factory.class))).then(i->{
return EnrollmentFactory.createTrumpEnrollment((NodeID)i.getArguments()[0] , weightGeneratorFactory); return EnrollmentFactory.createTrumpEnrollment((NodeID)i.getArguments()[0] , weightGeneratorFactory);
}); });
TCServerMain.server = mock(TCServer.class); TCServerMain.server = mock(TCServer.class);
when(TCServerMain.server.getActivateTime()).thenReturn(System.currentTimeMil lis()); when(TCServerMain.server.getActivateTime()).thenReturn(System.currentTimeMil lis());
Set<String> servers = new HashSet<>(); PortManager portManager = PortManager.getInstance();
for(int i = 0; i < NUM_OF_SERVERS; i++) { ports = portManager.reservePorts(NUM_OF_SERVERS);
int port = pc.chooseRandom2Port(); groupPorts = portManager.reservePorts(NUM_OF_SERVERS);
ports[i] = port; Set<String> servers = ports.stream().map(r -> "localhost:" + r.port()).colle
groupPorts[i] = port + 1; ct(toSet());
servers.add("localhost:" + ports[i]);
}
this.topologyManager = new TopologyManager(servers); this.topologyManager = new TopologyManager(servers);
for(int i = 0; i < NUM_OF_SERVERS; i++) { for(int i = 0; i < NUM_OF_SERVERS; i++) {
nodes[i] = new Node(LOCALHOST, ports[i], groupPorts[i]); nodes[i] = new Node(LOCALHOST, ports.get(i).port(), groupPorts.get(i).port ());
nodeSet.add(nodes[i]); nodeSet.add(nodes[i]);
stageControllers[i] = mock(StageController.class); stageControllers[i] = mock(StageController.class);
mgmt[i] = mock(ManagementTopologyEventCollector.class); mgmt[i] = mock(ManagementTopologyEventCollector.class);
ClusterStatePersistor clusterStatePersistorMock = mock(ClusterStatePersist or.class); ClusterStatePersistor clusterStatePersistorMock = mock(ClusterStatePersist or.class);
when(clusterStatePersistorMock.isDBClean()).thenReturn(Boolean.TRUE); when(clusterStatePersistorMock.isDBClean()).thenReturn(Boolean.TRUE);
// tcServers[i] = mock(TCServer.class); // tcServers[i] = mock(TCServer.class);
stageManagers[i] = new StageManagerImpl(new ThreadGroup("test"), new Queue Factory()); stageManagers[i] = new StageManagerImpl(new ThreadGroup("test"), new Queue Factory());
groupManagers[i] = new TCGroupManagerImpl(new NullConnectionPolicy(), LOCA LHOST, ports[i], groupPorts[i], groupManagers[i] = new TCGroupManagerImpl(new NullConnectionPolicy(), LOCA LHOST, ports.get(i).port(), groupPorts.get(i).port(),
stageManagers[i], weightGenerato rFactory, topologyManager); stageManagers[i], weightGenerato rFactory, topologyManager);
stateManagers[i] = new StateManagerImpl(tcLogger, groupManagers[i], stageC ontrollers[i], mgmt[i], stageManagers[i], NUM_OF_SERVERS, 5, weightGeneratorFact ory, mgr, stateManagers[i] = new StateManagerImpl(tcLogger, groupManagers[i], stageC ontrollers[i], mgmt[i], stageManagers[i], NUM_OF_SERVERS, 5, weightGeneratorFact ory, mgr,
clusterStatePersistorMock, topolog yManager); clusterStatePersistorMock, topolog yManager);
Sink<L2StateMessage> stateMessageSink = stageManagers[i].createStage(Serve rConfigurationContext.L2_STATE_MESSAGE_HANDLER_STAGE, L2StateMessage.class, new L2StateMessageHandler(), 1, 1).getSink(); Sink<L2StateMessage> stateMessageSink = stageManagers[i].createStage(Serve rConfigurationContext.L2_STATE_MESSAGE_HANDLER_STAGE, L2StateMessage.class, new L2StateMessageHandler(), 1, 1).getSink();
groupManagers[i].routeMessages(L2StateMessage.class, stateMessageSink); groupManagers[i].routeMessages(L2StateMessage.class, stateMessageSink);
groupManagers[i].setDiscover(new TCGroupMemberDiscoveryStatic(groupManager s[i], nodes[i])); groupManagers[i].setDiscover(new TCGroupMemberDiscoveryStatic(groupManager s[i], nodes[i]));
L2Coordinator l2CoordinatorMock = mock(L2Coordinator.class); L2Coordinator l2CoordinatorMock = mock(L2Coordinator.class);
when(l2CoordinatorMock.getStateManager()).thenReturn(stateManagers[i]); when(l2CoordinatorMock.getStateManager()).thenReturn(stateManagers[i]);
ServerConfigurationContext serverConfigurationContextMock = new ServerConf igurationContextImpl(stageManagers[i], null, null, null, l2CoordinatorMock); ServerConfigurationContext serverConfigurationContextMock = new ServerConf igurationContextImpl(stageManagers[i], null, null, null, l2CoordinatorMock);
stageManagers[i].startAll(serverConfigurationContextMock, new ArrayList<>( )); stageManagers[i].startAll(serverConfigurationContextMock, new ArrayList<>( ));
} }
} }
@After
public void tearDown() {
if (ports != null) {
ports.forEach(PortManager.PortRef::close);
}
if (groupPorts != null) {
groupPorts.forEach(PortManager.PortRef::close);
}
}
@Test @Test
public void testElection() throws Exception { public void testElection() throws Exception {
for(int i = 0; i < NUM_OF_SERVERS; i++) { for(int i = 0; i < NUM_OF_SERVERS; i++) {
GroupConfiguration groupConfiguration = mock(GroupConfiguration.class); GroupConfiguration groupConfiguration = mock(GroupConfiguration.class);
when(groupConfiguration.getNodes()).thenReturn(nodeSet); when(groupConfiguration.getNodes()).thenReturn(nodeSet);
when(groupConfiguration.getCurrentNode()).thenReturn(nodes[i]); when(groupConfiguration.getCurrentNode()).thenReturn(nodes[i]);
groupManagers[i].join(groupConfiguration); groupManagers[i].join(groupConfiguration);
} }
for(int i = 0; i < NUM_OF_SERVERS; i++) { for(int i = 0; i < NUM_OF_SERVERS; i++) {
skipping to change at line 191 skipping to change at line 203
int activeIndex = -1; int activeIndex = -1;
for (int i = 0; i < NUM_OF_SERVERS; i++) { for (int i = 0; i < NUM_OF_SERVERS; i++) {
if (groupManagers[i].getLocalNodeID().equals(expectedActive)) { if (groupManagers[i].getLocalNodeID().equals(expectedActive)) {
activeIndex = i; activeIndex = i;
break; break;
} }
} }
for (int i = 0; i < NUM_OF_SERVERS; i++) { for (int i = 0; i < NUM_OF_SERVERS; i++) {
while (stateManagers[i].getCurrentMode() == ServerMode.START) {
ThreadUtil.reallySleep(1000);
}
}
for (int i = 0; i < NUM_OF_SERVERS; i++) {
if (activeIndex == i) { if (activeIndex == i) {
// verify(tcServers[i]).l2StateChanged(argThat(stateChangeEvent(START_STA TE, ACTIVE_COORDINATOR)));
verify(stageControllers[i]).transition(eq(START_STATE), eq(ACTIVE_COORDI NATOR)); verify(stageControllers[i]).transition(eq(START_STATE), eq(ACTIVE_COORDI NATOR));
} else { } else {
// verify(tcServers[i]).l2StateChanged(argThat(stateChangeEvent(START_STA TE, PASSIVE_UNINITIALIZED)));
verify(stageControllers[i]).transition(eq(START_STATE), eq(PASSIVE_UNINI TIALIZED)); verify(stageControllers[i]).transition(eq(START_STATE), eq(PASSIVE_UNINI TIALIZED));
} }
} }
} }
@Test @Ignore("no longer a valid test. Passives can become actives") @Test @Ignore("no longer a valid test. Passives can become actives")
public void testInitialElectionDoesNotSetStandbyAsActive() throws Exception { public void testInitialElectionDoesNotSetStandbyAsActive() throws Exception {
Logger logger = mock(Logger.class); Logger logger = mock(Logger.class);
GroupManager grp = mock(GroupManager.class); GroupManager grp = mock(GroupManager.class);
skipping to change at line 390 skipping to change at line 406
StateManagerImpl mgr = new StateManagerImpl(tcLogger, groupManager, stageCon troller, mgmtController, stageMgr, 2, 5, weightGeneratorFactory, availabilityMgr , persistor, this.topologyManager); StateManagerImpl mgr = new StateManagerImpl(tcLogger, groupManager, stageCon troller, mgmtController, stageMgr, 2, 5, weightGeneratorFactory, availabilityMgr , persistor, this.topologyManager);
mgr.initializeAndStartElection(); mgr.initializeAndStartElection();
Assert.assertEquals(ServerMode.START, mgr.getCurrentMode()); Assert.assertEquals(ServerMode.START, mgr.getCurrentMode());
Assert.assertEquals(ServerMode.SYNCING, mgr.getStateMap().get("startState")) ; Assert.assertEquals(ServerMode.SYNCING, mgr.getStateMap().get("startState")) ;
L2StateMessage sw = mock(L2StateMessage.class); L2StateMessage sw = mock(L2StateMessage.class);
when(sw.getType()).thenReturn(L2StateMessage.ABORT_ELECTION); when(sw.getType()).thenReturn(L2StateMessage.ABORT_ELECTION);
when(sw.getState()).thenReturn(StateManager.ACTIVE_COORDINATOR); when(sw.getState()).thenReturn(StateManager.ACTIVE_COORDINATOR);
Enrollment winner = mock(Enrollment.class); Enrollment winner = mock(Enrollment.class);
when(sw.getEnrollment()).thenReturn(winner); when(sw.getEnrollment()).thenReturn(winner);
NodeID active = mock(NodeID.class); ServerID active = mock(ServerID.class);
when(winner.getNodeID()).thenReturn(active); when(winner.getNodeID()).thenReturn(active);
when(winner.wins(any(Enrollment.class))).thenReturn(Boolean.TRUE); when(winner.wins(any(Enrollment.class))).thenReturn(Boolean.TRUE);
when(winner.getWeights()).thenReturn(new long[0]); when(winner.getWeights()).thenReturn(new long[0]);
when(sw.messageFrom()).thenReturn(active); when(sw.messageFrom()).thenReturn(active);
try { try {
mgr.handleClusterStateMessage(sw); mgr.handleClusterStateMessage(sw);
Assert.fail("restart exception expected"); Assert.fail("restart exception expected");
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
Assert.assertTrue(t.toString(), t instanceof TCServerRestartException); Assert.assertTrue(t.toString(), t instanceof TCServerRestartException);
 End of changes. 15 change blocks. 
16 lines changed or deleted 33 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)