"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "dso-l1/src/main/java/com/tc/object/ClientEntityManagerImpl.java" between
terracotta-core-5.7.0.tar.gz and terracotta-core-5.7.1.tar.gz

About: Terracotta - Clustering technology for Java ("Network Attached Memory", "Distributed Cache"). Source code of the server (5.x) core components.

ClientEntityManagerImpl.java  (terracotta-core-5.7.0):ClientEntityManagerImpl.java  (terracotta-core-5.7.1)
skipping to change at line 62 skipping to change at line 62
import com.tc.object.msg.ClientEntityReferenceContext; import com.tc.object.msg.ClientEntityReferenceContext;
import com.tc.object.msg.ClientHandshakeMessage; import com.tc.object.msg.ClientHandshakeMessage;
import com.tc.object.session.SessionID; import com.tc.object.session.SessionID;
import com.tc.object.tx.TransactionID; import com.tc.object.tx.TransactionID;
import com.tc.text.MapListPrettyPrint; import com.tc.text.MapListPrettyPrint;
import com.tc.text.PrettyPrintable; import com.tc.text.PrettyPrintable;
import com.tc.util.Assert; import com.tc.util.Assert;
import com.tc.util.Util; import com.tc.util.Util;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
skipping to change at line 138 skipping to change at line 135
} }
if (requestTickets.tryAcquire()) { if (requestTickets.tryAcquire()) {
inFlightMessages.put(msg.getTransactionID(), msg); inFlightMessages.put(msg.getTransactionID(), msg);
return true; return true;
} else { } else {
throw new RejectedExecutionException("Output queue is full"); throw new RejectedExecutionException("Output queue is full");
} }
} }
private synchronized boolean enqueueMessage(InFlightMessage msg, long timeout, TimeUnit unit, boolean waitUntilRunning) throws TimeoutException { private synchronized boolean enqueueMessage(InFlightMessage msg, long timeout, TimeUnit unit, boolean waitUntilRunning) throws TimeoutException {
long end = (timeout > 0) ? System.nanoTime() + unit.toNanos(timeout) : 0; boolean interrupted = Thread.interrupted();
if (waitUntilRunning) { try {
boolean interrupted = Thread.interrupted(); long end = 0;
try { if (timeout < 0) {
throw new IllegalArgumentException("timeout must be >= 0");
} else if (timeout > 0) {
end = System.nanoTime() + unit.toNanos(timeout);
}
if (waitUntilRunning) {
while (!stateManager.isRunning()) { while (!stateManager.isRunning()) {
try { try {
if (stateManager.isShutdown()) { if (stateManager.isShutdown()) {
return false; return false;
} else if (timeout == 0) {
wait();
} else { } else {
long timing = (end > 0) ? end - System.nanoTime() : 0; long timing = end - System.nanoTime();
if (timing < 0) { wait(timing / TimeUnit.MILLISECONDS.toNanos(1), (int) (timing % Ti
throw new TimeoutException(); meUnit.MILLISECONDS.toNanos(1)));
} else {
wait(timing / TimeUnit.MILLISECONDS.toNanos(1), (int) (timing %
TimeUnit.MILLISECONDS.toNanos(1)));
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
interrupted = true; interrupted = true;
} }
} }
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
}
// stop drains the permits so even if asked to not waitUntilRunning, stop is // stop drains the permits so even if asked to not waitUntilRunning, stop
still checked is still checked
if (!stateManager.isShutdown()) { // enqueue here if the system is not shutd
own, running or paused, the message should be readied for send while (!stateManager.isShutdown()) {
if (requestTickets.tryAcquireUninterruptibly(end - System.nanoTime(), Time try {
Unit.NANOSECONDS)) { if (timeout == 0) {
inFlightMessages.put(msg.getTransactionID(), msg); requestTickets.acquire();
return true; } else if (!requestTickets.tryAcquire(end - System.nanoTime(), TimeUni
} else { t.NANOSECONDS)) {
throw new TimeoutException(); throw new TimeoutException();
}
inFlightMessages.put(msg.getTransactionID(), msg);
return true;
} catch (InterruptedException e) {
interrupted = true;
}
} }
} else {
return false; return false;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Override @Override
public EntityClientEndpoint fetchEntity(EntityID entity, long version, ClientI nstanceID instance, MessageCodec<? extends EntityMessage, ? extends EntityRespon se> codec, Runnable closeHook) throws EntityException { public EntityClientEndpoint fetchEntity(EntityID entity, long version, ClientI nstanceID instance, MessageCodec<? extends EntityMessage, ? extends EntityRespon se> codec, Runnable closeHook) throws EntityException {
return internalLookup(entity, version, instance, codec, closeHook); return internalLookup(entity, version, instance, codec, closeHook);
} }
@Override @Override
skipping to change at line 465 skipping to change at line 471
FlushResponse flush = new FlushResponse(); FlushResponse flush = new FlushResponse();
responderMulti.getSink().addToSink(flush); responderMulti.getSink().addToSink(flush);
flush.waitForAccess(); flush.waitForAccess();
} }
// Walk the inFlightMessages, adding them all to the handshake, since we nee d them to be replayed. // Walk the inFlightMessages, adding them all to the handshake, since we nee d them to be replayed.
for (InFlightMessage inFlight : this.inFlightMessages.values()) { for (InFlightMessage inFlight : this.inFlightMessages.values()) {
VoltronEntityMessage message = inFlight.getMessage(); VoltronEntityMessage message = inFlight.getMessage();
// validate the locking on release and destroy on resends // validate the locking on release and destroy on resends
ResendVoltronEntityMessage packaged = new ResendVoltronEntityMessage(messa ge.getSource(), message.getTransactionID(), ResendVoltronEntityMessage packaged = new ResendVoltronEntityMessage(messa ge.getSource(), message.getTransactionID(),
message.getEntityDescriptor(), message.getVoltronType(), message.doesR message.getEntityDescriptor(), message.getVoltronType(), message.doesR
equireReplication(), message.getExtendedData(), equireReplication(), message.getExtendedData());
message.getOldestTransactionOnClient());
handshakeMessage.addResendMessage(packaged); handshakeMessage.addResendMessage(packaged);
} }
} }
@Override @Override
public void shutdown() { public void shutdown() {
synchronized (this) { synchronized (this) {
if (this.stateManager.isShutdown()) { if (this.stateManager.isShutdown()) {
return; return;
} else { } else {
skipping to change at line 666 skipping to change at line 671
// We have no client instance for a create but the request currently require s a full descriptor. // We have no client instance for a create but the request currently require s a full descriptor.
EntityDescriptor entityDescriptor = EntityDescriptor.createDescriptorForLife cycle(entityID, version); EntityDescriptor entityDescriptor = EntityDescriptor.createDescriptorForLife cycle(entityID, version);
return createMessageWithDescriptor(entityID, entityDescriptor, requiresRepli cation, config, type, acks); return createMessageWithDescriptor(entityID, entityDescriptor, requiresRepli cation, config, type, acks);
} }
private NetworkVoltronEntityMessage createMessageWithDescriptor(EntityID entit yID, EntityDescriptor entityDescriptor, boolean requiresReplication, byte[] conf ig, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) { private NetworkVoltronEntityMessage createMessageWithDescriptor(EntityID entit yID, EntityDescriptor entityDescriptor, boolean requiresReplication, byte[] conf ig, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
NetworkVoltronEntityMessage message = (NetworkVoltronEntityMessage) channel. createMessage(TCMessageType.VOLTRON_ENTITY_MESSAGE); NetworkVoltronEntityMessage message = (NetworkVoltronEntityMessage) channel. createMessage(TCMessageType.VOLTRON_ENTITY_MESSAGE);
ClientID clientID = this.channel.getClientID(); ClientID clientID = this.channel.getClientID();
TransactionID transactionID = transactionSource.create(); TransactionID transactionID = transactionSource.create();
TransactionID oldestTransactionPending = transactionSource.oldest();//either premature retirement or late (or missing) removal from inflight TransactionID oldestTransactionPending = transactionSource.oldest();//either premature retirement or late (or missing) removal from inflight
//this is expensive -- only assert when testing
TransactionID oldestTransactionInFlight;
assert (oldestTransactionInFlight = oldestTransactionIn(inFlightMessages.key
Set())) == null || (oldestTransactionPending.compareTo(oldestTransactionInFlight
) <= 0);
message.setContents(clientID, transactionID, entityID, entityDescriptor, typ e, requiresReplication, TCByteBufferFactory.wrap(config), oldestTransactionPendi ng, acks); message.setContents(clientID, transactionID, entityID, entityDescriptor, typ e, requiresReplication, TCByteBufferFactory.wrap(config), oldestTransactionPendi ng, acks);
return message; return message;
} }
private TransactionID oldestTransactionIn(Collection<TransactionID> transactio
nIds) {
Iterator<TransactionID> it = transactionIds.iterator();
if (it.hasNext()) {
TransactionID oldest = it.next();
for (TransactionID txnId : transactionIds) {
if (oldest.compareTo(txnId) > 0) {
oldest = txnId;
}
}
return oldest;
} else {
return null;
}
}
private static class FlushResponse implements VoltronEntityResponse, VoltronEn tityMultiResponse { private static class FlushResponse implements VoltronEntityResponse, VoltronEn tityMultiResponse {
private boolean accessed = false; private boolean accessed = false;
@Override @Override
public synchronized TransactionID getTransactionID() { public synchronized TransactionID getTransactionID() {
notifyAll(); notifyAll();
accessed = true; accessed = true;
return TransactionID.NULL_ID; return TransactionID.NULL_ID;
} }
skipping to change at line 824 skipping to change at line 811
private static class StoppableSemaphore extends Semaphore { private static class StoppableSemaphore extends Semaphore {
private final int permits; private final int permits;
public StoppableSemaphore(int permits) { public StoppableSemaphore(int permits) {
super(permits); super(permits);
this.permits = permits; this.permits = permits;
} }
public boolean tryAcquireUninterruptibly(long timeout, TimeUnit unit) {
boolean interrupted = Thread.interrupted();
try {
while (true) {
long start = System.nanoTime();
try {
return super.tryAcquire(timeout, unit);
} catch (InterruptedException e) {
interrupted = true;
timeout = unit.toNanos(timeout) - (System.nanoTime() - start);
unit = TimeUnit.NANOSECONDS;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void stop() { private void stop() {
reducePermits(permits); reducePermits(permits);
} }
} }
} }
 End of changes. 14 change blocks. 
74 lines changed or deleted 37 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)