"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l2/src/main/java/com/tc/objectserver/handler/ProcessTransactionHandler.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

ProcessTransactionHandler.java  (terracotta-core-5.7.0):ProcessTransactionHandler.java  (terracotta-core-5.7.1)
skipping to change at line 368 skipping to change at line 368
// This is active-side processing so this is never a replicated message. // This is active-side processing so this is never a replicated message.
boolean isReplicatedMessage = false; boolean isReplicatedMessage = false;
// In the general case, however, we need to pass this as a real ServerEntity Request, into the entityProcessor. // In the general case, however, we need to pass this as a real ServerEntity Request, into the entityProcessor.
// Before we pass this on to the entity or complete it, directly, we can sen d the received() ACK, since we now know the message order. // Before we pass this on to the entity or complete it, directly, we can sen d the received() ACK, since we now know the message order.
// Note that we only want to persist the messages with a true sourceNodeID. Synthetic invocations and sync messages // Note that we only want to persist the messages with a true sourceNodeID. Synthetic invocations and sync messages
// don't have one (although sync messages shouldn't come down this path). // don't have one (although sync messages shouldn't come down this path).
Future<Void> transactionOrderPersistenceFuture = null; Future<Void> transactionOrderPersistenceFuture = null;
// if the client is valid and the transaction id is valid, then this came fr om a real client // if the client is valid and the transaction id is valid, then this came fr om a real client
// and the client expects to be able to reconnect // and the client expects to be able to reconnect
ServerEntityRequestImpl request = new ServerEntityRequestImpl(descriptor.get ClientInstanceID(), action, sourceNodeID, transactionID, transactionID, requires Received); ServerEntityRequestImpl request = new ServerEntityRequestImpl(descriptor.get ClientInstanceID(), action, sourceNodeID, transactionID, oldestTransactionOnClie nt, requiresReceived);
if (sourceNodeID != null && !sourceNodeID.isNull() && transactionID.isValid( )) { if (sourceNodeID != null && !sourceNodeID.isNull() && transactionID.isValid( )) {
Assert.assertTrue(oldestTransactionOnClient.isValid()); Assert.assertTrue(oldestTransactionOnClient.isValid());
// This client still needs transaction order persistence. // This client still needs transaction order persistence.
transactionOrderPersistenceFuture = this.persistor.getTransactionOrderPers istor().updateWithNewMessage(sourceNodeID, transactionID, oldestTransactionOnCli ent); transactionOrderPersistenceFuture = this.persistor.getTransactionOrderPers istor().updateWithNewMessage(sourceNodeID, transactionID, oldestTransactionOnCli ent);
} }
Trace trace = null; Trace trace = null;
if (Trace.isTraceEnabled()) { if (Trace.isTraceEnabled()) {
trace = new Trace(request.getTraceID(), "ProcessTransactionHandler.AddMess age"); trace = new Trace(request.getTraceID(), "ProcessTransactionHandler.AddMess age");
trace.start(); trace.start();
skipping to change at line 693 skipping to change at line 693
default: default:
// Unknown request type. // Unknown request type.
Assert.fail(); Assert.fail();
break; break;
} }
return action; return action;
} }
private class InvokeHandler extends AbstractServerEntityRequestResponse implem ents ResultCapture, StatisticsCapture { private class InvokeHandler extends AbstractServerEntityRequestResponse implem ents ResultCapture, StatisticsCapture {
private Supplier<ActivePassiveAckWaiter> waiter; private Supplier<ActivePassiveAckWaiter> waiter;
private final SetOnceFlag sent = new SetOnceFlag(); private final SetOnceFlag lastSent = new SetOnceFlag();
private final SetOnceFlag failure = new SetOnceFlag();
private final boolean sendReceived; private final boolean sendReceived;
private final boolean holdResultForRetired; private final boolean holdResultForRetired;
private byte[] heldResult; private byte[] heldResult;
private final long[] stats = new long[StatType.SERVER_RETIRED.serverSpot() + 1]; private final long[] stats = new long[StatType.SERVER_RETIRED.serverSpot() + 1];
InvokeHandler(ServerEntityRequest request, Consumer<VoltronEntityResponse> s ender, Consumer<byte[]> complete, Consumer<ServerException> failure, boolean req Received, boolean reqRetired) { InvokeHandler(ServerEntityRequest request, Consumer<VoltronEntityResponse> s ender, Consumer<byte[]> complete, Consumer<ServerException> failure, boolean req Received, boolean reqRetired) {
super(request, sender, complete, failure); super(request, sender, complete, failure);
sendReceived = reqReceived; sendReceived = reqReceived;
holdResultForRetired = reqRetired; holdResultForRetired = reqRetired;
} }
skipping to change at line 757 skipping to change at line 756
public void setWaitFor(Supplier<ActivePassiveAckWaiter> waiter) { public void setWaitFor(Supplier<ActivePassiveAckWaiter> waiter) {
this.waiter = waiter; this.waiter = waiter;
} }
@Override @Override
public void waitForReceived() { public void waitForReceived() {
this.waiter.get().waitForReceived(); this.waiter.get().waitForReceived();
} }
private void sendResponse(byte[] result) { private void sendResponse(byte[] result) {
if (!failure.isSet() && sent.attemptSet()) { if (lastSent.attemptSet()) {
if (getNodeID().isNull()) { if (getNodeID().isNull()) {
super.complete(result); super.complete(result);
} else { } else {
if (!holdResultForRetired) { if (!holdResultForRetired) {
addSequentially(getNodeID(), addTo->addTo.addResult(getTransaction() , result)); addSequentially(getNodeID(), addTo->addTo.addResult(getTransaction() , result));
} else { } else {
heldResult = result; heldResult = result;
} }
} }
} else { } else {
throw new AssertionError(); // possible that a failure is already sent on the wire
} }
} }
private void sendFailure(ServerException e) { private void sendFailure(ServerException e) {
if (failure.attemptSet()) { if (!lastSent.attemptSet()) {
super.failure(e); if (heldResult == null) {
MonitoringEventCreator.finish(); // no held result. failure already sent
} else { return;
throw new AssertionError(); } else {
// clear held result
heldResult = null;
}
} }
super.failure(e);
MonitoringEventCreator.finish();
} }
@Override @Override
public CompletionStage<Void> retired() { public CompletionStage<Void> retired() {
CompletableFuture<Void> complete = new CompletableFuture<>(); CompletableFuture<Void> complete = new CompletableFuture<>();
this.waiter.get().runWhenCompleted(()->{ this.waiter.get().runWhenCompleted(()->{
if (!getNodeID().isNull()) { if (!getNodeID().isNull()) {
stats[StatType.SERVER_RETIRED.serverSpot()] = System.nanoTime(); stats[StatType.SERVER_RETIRED.serverSpot()] = System.nanoTime();
Assert.assertTrue(sent.isSet() || failure.isSet()); Assert.assertTrue(lastSent.isSet());
safeGetChannel(getNodeID()).ifPresent(c -> { safeGetChannel(getNodeID()).ifPresent(c -> {
if (c.getAttachment("SendStats") != null) { if (c.getAttachment("SendStats") != null) {
addSequentially(getNodeID(), addTo -> addTo.addStats(InvokeHandler .this.getTransaction(), stats)); addSequentially(getNodeID(), addTo -> addTo.addStats(InvokeHandler .this.getTransaction(), stats));
} }
}); });
addSequentially(getNodeID(), addTo -> { addSequentially(getNodeID(), addTo -> {
if (heldResult != null) { if (heldResult != null) {
return addTo.addResultAndRetire(InvokeHandler.this.getTransaction( ), heldResult); return addTo.addResultAndRetire(InvokeHandler.this.getTransaction( ), heldResult);
} else { } else {
return addTo.addRetired(InvokeHandler.this.getTransaction()); return addTo.addRetired(InvokeHandler.this.getTransaction());
 End of changes. 7 change blocks. 
11 lines changed or deleted 15 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)