hadoop  1.0.4
About: Apache Hadoop is a software framework that allows for the distributed processing of large data sets across large cluster built of commodity hardware (implementing a computational paradigm named Map/Reduce, and providing a distributed file system named HDFS).
  Fossies Dox: hadoop-1.0.4.tar.gz  ("inofficial" and yet experimental doxygen-generated source code documentation)  

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
OutputHandler.java
Go to the documentation of this file.
1 
19 package org.apache.hadoop.mapred.pipes;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 
27 import org.apache.hadoop.io.FloatWritable;
28 import org.apache.hadoop.io.NullWritable;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31 import org.apache.hadoop.mapred.Counters;
32 import org.apache.hadoop.mapred.OutputCollector;
33 import org.apache.hadoop.mapred.RecordReader;
34 import org.apache.hadoop.mapred.Reporter;
35 
39 class OutputHandler<K extends WritableComparable,
40  V extends Writable>
41  implements UpwardProtocol<K, V> {
42 
43  private Reporter reporter;
44  private OutputCollector<K, V> collector;
45  private float progressValue = 0.0f;
46  private boolean done = false;
47 
48  private Throwable exception = null;
49  RecordReader<FloatWritable,NullWritable> recordReader = null;
50  private Map<Integer, Counters.Counter> registeredCounters =
51  new HashMap<Integer, Counters.Counter>();
52 
53  private String expectedDigest = null;
54  private boolean digestReceived = false;
60  public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
61  RecordReader<FloatWritable,NullWritable> recordReader,
62  String expectedDigest) {
63  this.reporter = reporter;
64  this.collector = collector;
65  this.recordReader = recordReader;
66  this.expectedDigest = expectedDigest;
67  }
68 
72  public void output(K key, V value) throws IOException {
73  collector.collect(key, value);
74  }
75 
79  public void partitionedOutput(int reduce, K key,
80  V value) throws IOException {
81  PipesPartitioner.setNextPartition(reduce);
82  collector.collect(key, value);
83  }
84 
88  public void status(String msg) {
89  reporter.setStatus(msg);
90  }
91 
92  private FloatWritable progressKey = new FloatWritable(0.0f);
93  private NullWritable nullValue = NullWritable.get();
97  public void progress(float progress) throws IOException {
98  progressValue = progress;
99  reporter.progress();
100 
101  if (recordReader != null) {
102  progressKey.set(progress);
103  recordReader.next(progressKey, nullValue);
104  }
105  }
106 
110  public void done() throws IOException {
111  synchronized (this) {
112  done = true;
113  notify();
114  }
115  }
116 
121  public float getProgress() {
122  return progressValue;
123  }
124 
128  public void failed(Throwable e) {
129  synchronized (this) {
130  exception = e;
131  notify();
132  }
133  }
134 
140  public synchronized boolean waitForFinish() throws Throwable {
141  while (!done && exception == null) {
142  wait();
143  }
144  if (exception != null) {
145  throw exception;
146  }
147  return done;
148  }
149 
150  public void registerCounter(int id, String group, String name) throws IOException {
151  Counters.Counter counter = reporter.getCounter(group, name);
152  registeredCounters.put(id, counter);
153  }
154 
155  public void incrementCounter(int id, long amount) throws IOException {
156  if (id < registeredCounters.size()) {
157  Counters.Counter counter = registeredCounters.get(id);
158  counter.increment(amount);
159  } else {
160  throw new IOException("Invalid counter with id: " + id);
161  }
162  }
163 
164  public synchronized boolean authenticate(String digest) throws IOException {
165  boolean success = true;
166  if (!expectedDigest.equals(digest)) {
167  exception = new IOException("Authentication Failed: Expected digest="
168  + expectedDigest + ", received=" + digestReceived);
169  success = false;
170  }
171  digestReceived = true;
172  notify();
173  return success;
174  }
175 
182  synchronized void waitForAuthentication()
183  throws IOException, InterruptedException {
184  while (digestReceived == false && exception == null) {
185  wait();
186  }
187  if (exception != null) {
188  throw new IOException(exception.getMessage());
189  }
190  }
191 }