"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "modules/dcache-frontend/src/main/java/org/dcache/restful/events/Channel.java" between
dcache-6.0.14-src.tar.gz and dcache-6.0.15-src.tar.gz

About: dCache provides a system for storing and retrieving huge amounts of data, distributed among a large number of heterogenous server nodes, under a single virtual filesystem tree with a variety of standard access methods. Free usage only for academic and non-profit organizations. Fature release. Source code.

Channel.java  (dcache-6.0.14-src):Channel.java  (dcache-6.0.15-src)
skipping to change at line 34 skipping to change at line 34
import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.EvictingQueue; import com.google.common.collect.EvictingQueue;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.security.auth.Subject; import javax.security.auth.Subject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.BadRequestException; import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException; import javax.ws.rs.NotFoundException;
import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse; import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink; import javax.ws.rs.sse.SseEventSink;
import java.io.EOFException; import java.io.EOFException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
skipping to change at line 57 skipping to change at line 58
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.dcache.auth.Subjects; import org.dcache.auth.Subjects;
import org.dcache.restful.events.spi.EventStream; import org.dcache.restful.events.spi.EventStream;
import org.dcache.restful.events.spi.SelectionContext;
import org.dcache.restful.events.spi.SelectionResult; import org.dcache.restful.events.spi.SelectionResult;
import org.dcache.restful.events.spi.SelectionStatus; import org.dcache.restful.events.spi.SelectionStatus;
import org.dcache.restful.util.CloseableWithTasks; import org.dcache.restful.util.CloseableWithTasks;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
/** /**
* A channel represents a client's desire to know about events. Each HTTP * A channel represents a client's desire to know about events. Each HTTP
* client should have a single channel object, which has a corresponding unique * client should have a single channel object, which has a corresponding unique
* URL. * URL.
skipping to change at line 384 skipping to change at line 386
String url = subscriptionValueBuilder.apply(eventType, subscript ionId); String url = subscriptionValueBuilder.apply(eventType, subscript ionId);
event.put("subscription", url); event.put("subscription", url);
String data = new ObjectMapper().writeValueAsString(event); String data = new ObjectMapper().writeValueAsString(event);
sendEvent(sse.newEventBuilder().name("SYSTEM").data(data).build( )); sendEvent(sse.newEventBuilder().name("SYSTEM").data(data).build( ));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
LOGGER.warn("Failed to build {} data: {}", type, e.toString()); LOGGER.warn("Failed to build {} data: {}", type, e.toString());
} }
} }
} }
public SubscriptionResult subscribe(String channelId, String eventType, Json public SubscriptionResult subscribe(final HttpServletRequest request,
Node selector) final String channelId, String eventType, JsonNode selector)
{ {
EventStream es = repository.getEventStream(eventType) EventStream es = repository.getEventStream(eventType)
.orElseThrow(() -> new BadRequestException("Unknown event ty pe: " + eventType)); .orElseThrow(() -> new BadRequestException("Unknown event ty pe: " + eventType));
SelectionResult result = es.select(channelId, SelectionContext context = new SelectionContext() {
@Override
public String channelId() {
return channelId;
}
@Override
public HttpServletRequest httpServletRequest() {
return request;
}
};
SelectionResult result = es.select(context,
(id,event) -> { (id,event) -> {
try { try {
sendEvent(eventType, id, event); sendEvent(eventType, id, event);
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOGGER.error("Bug found in dCache, please report to <support@dCache.org>", e); LOGGER.error("Bug found in dCache, please report to <support@dCache.org>", e);
} }
}, selector); }, selector);
if (result.getStatus() == SelectionStatus.CREATED) { if (result.getStatus() == SelectionStatus.CREATED) {
String subscriptionId = result.getSelectedEventStream().getId(); String subscriptionId = result.getSelectedEventStream().getId();
 End of changes. 4 change blocks. 
3 lines changed or deleted 17 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)