19 package org.apache.hadoop.mapred.pipes;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
27 import org.apache.hadoop.io.FloatWritable;
28 import org.apache.hadoop.io.NullWritable;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31 import org.apache.hadoop.mapred.Counters;
32 import org.apache.hadoop.mapred.OutputCollector;
33 import org.apache.hadoop.mapred.RecordReader;
34 import org.apache.hadoop.mapred.Reporter;
39 class OutputHandler<K
extends WritableComparable,
41 implements UpwardProtocol<K, V> {
43 private Reporter reporter;
44 private OutputCollector<K, V> collector;
45 private float progressValue = 0.0f;
46 private boolean done =
false;
48 private Throwable exception = null;
49 RecordReader<FloatWritable,NullWritable> recordReader = null;
50 private Map<Integer, Counters.Counter> registeredCounters =
51 new HashMap<Integer, Counters.Counter>();
53 private String expectedDigest = null;
54 private boolean digestReceived =
false;
60 public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
61 RecordReader<FloatWritable,NullWritable> recordReader,
62 String expectedDigest) {
63 this.reporter = reporter;
64 this.collector = collector;
65 this.recordReader = recordReader;
66 this.expectedDigest = expectedDigest;
72 public void output(K key, V value)
throws IOException {
73 collector.collect(key, value);
79 public void partitionedOutput(
int reduce, K key,
80 V value)
throws IOException {
81 PipesPartitioner.setNextPartition(reduce);
82 collector.collect(key, value);
88 public void status(String msg) {
89 reporter.setStatus(msg);
92 private FloatWritable progressKey =
new FloatWritable(0.0f);
93 private NullWritable nullValue = NullWritable.get();
97 public void progress(
float progress)
throws IOException {
98 progressValue = progress;
101 if (recordReader != null) {
102 progressKey.set(progress);
103 recordReader.next(progressKey, nullValue);
110 public void done() throws IOException {
111 synchronized (
this) {
121 public float getProgress() {
122 return progressValue;
128 public void failed(Throwable e) {
129 synchronized (
this) {
140 public synchronized boolean waitForFinish() throws Throwable {
141 while (!done && exception == null) {
144 if (exception != null) {
150 public void registerCounter(
int id, String group, String name)
throws IOException {
151 Counters.Counter counter = reporter.getCounter(group, name);
152 registeredCounters.put(
id, counter);
155 public void incrementCounter(
int id,
long amount)
throws IOException {
156 if (
id < registeredCounters.size()) {
157 Counters.Counter counter = registeredCounters.get(
id);
158 counter.increment(amount);
160 throw new IOException(
"Invalid counter with id: " +
id);
164 public synchronized boolean authenticate(String digest)
throws IOException {
165 boolean success =
true;
166 if (!expectedDigest.equals(digest)) {
167 exception =
new IOException(
"Authentication Failed: Expected digest="
168 + expectedDigest +
", received=" + digestReceived);
171 digestReceived =
true;
182 synchronized void waitForAuthentication()
183 throws IOException, InterruptedException {
184 while (digestReceived ==
false && exception == null) {
187 if (exception != null) {
188 throw new IOException(exception.getMessage());