1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Method;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.client.ResultScanner;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.client.ScannerCallable;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.Counter;
39 import org.apache.hadoop.mapreduce.InputSplit;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41 import org.apache.hadoop.util.StringUtils;
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class TableRecordReaderImpl {
50 public static final String LOG_PER_ROW_COUNT
51 = "hbase.mapreduce.log.scanner.rowcount";
52
53 private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
54
55
56 private static final String HBASE_COUNTER_GROUP_NAME =
57 "HBase Counters";
58 private ResultScanner scanner = null;
59 private Scan scan = null;
60 private Scan currentScan = null;
61 private Table htable = null;
62 private byte[] lastSuccessfulRow = null;
63 private ImmutableBytesWritable key = null;
64 private Result value = null;
65 private TaskAttemptContext context = null;
66 private Method getCounter = null;
67 private long numRestarts = 0;
68 private long numStale = 0;
69 private long timestamp;
70 private int rowcount;
71 private boolean logScannerActivity = false;
72 private int logPerRowCount = 100;
73
74
75
76
77
78
79
80 public void restart(byte[] firstRow) throws IOException {
81 currentScan = new Scan(scan);
82 currentScan.setStartRow(firstRow);
83 currentScan.setScanMetricsEnabled(true);
84 if (this.scanner != null) {
85 if (logScannerActivity) {
86 LOG.info("Closing the previously opened scanner object.");
87 }
88 this.scanner.close();
89 }
90 this.scanner = this.htable.getScanner(currentScan);
91 if (logScannerActivity) {
92 LOG.info("Current scan=" + currentScan.toString());
93 timestamp = System.currentTimeMillis();
94 rowcount = 0;
95 }
96 }
97
98
99
100
101
102
103
104 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
105 throws IOException {
106 Method m = null;
107 try {
108 m = context.getClass().getMethod("getCounter",
109 new Class [] {String.class, String.class});
110 } catch (SecurityException e) {
111 throw new IOException("Failed test for getCounter", e);
112 } catch (NoSuchMethodException e) {
113
114 }
115 return m;
116 }
117
118
119
120
121
122
123 public void setHTable(Table htable) {
124 Configuration conf = htable.getConfiguration();
125 logScannerActivity = conf.getBoolean(
126 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
127 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
128 this.htable = htable;
129 }
130
131
132
133
134
135
136 public void setScan(Scan scan) {
137 this.scan = scan;
138 }
139
140
141
142
143
144
145
146 public void initialize(InputSplit inputsplit,
147 TaskAttemptContext context) throws IOException,
148 InterruptedException {
149 if (context != null) {
150 this.context = context;
151 getCounter = retrieveGetCounterWithStringsParams(context);
152 }
153 restart(scan.getStartRow());
154 }
155
156
157
158
159
160
161 public void close() {
162 this.scanner.close();
163 try {
164 this.htable.close();
165 } catch (IOException ioe) {
166 LOG.warn("Error closing table", ioe);
167 }
168 }
169
170
171
172
173
174
175
176
177 public ImmutableBytesWritable getCurrentKey() throws IOException,
178 InterruptedException {
179 return key;
180 }
181
182
183
184
185
186
187
188
189 public Result getCurrentValue() throws IOException, InterruptedException {
190 return value;
191 }
192
193
194
195
196
197
198
199
200
201 public boolean nextKeyValue() throws IOException, InterruptedException {
202 if (key == null) key = new ImmutableBytesWritable();
203 if (value == null) value = new Result();
204 try {
205 try {
206 value = this.scanner.next();
207 if (value != null && value.isStale()) numStale++;
208 if (logScannerActivity) {
209 rowcount ++;
210 if (rowcount >= logPerRowCount) {
211 long now = System.currentTimeMillis();
212 LOG.info("Mapper took " + (now-timestamp)
213 + "ms to process " + rowcount + " rows");
214 timestamp = now;
215 rowcount = 0;
216 }
217 }
218 } catch (IOException e) {
219
220 if (e instanceof DoNotRetryIOException) {
221 throw e;
222 }
223
224
225 LOG.info("recovered from " + StringUtils.stringifyException(e));
226 if (lastSuccessfulRow == null) {
227 LOG.warn("We are restarting the first next() invocation," +
228 " if your mapper has restarted a few other times like this" +
229 " then you should consider killing this job and investigate" +
230 " why it's taking so long.");
231 }
232 if (lastSuccessfulRow == null) {
233 restart(scan.getStartRow());
234 } else {
235 restart(lastSuccessfulRow);
236 scanner.next();
237 }
238 value = scanner.next();
239 if (value != null && value.isStale()) numStale++;
240 numRestarts++;
241 }
242 if (value != null && value.size() > 0) {
243 key.set(value.getRow());
244 lastSuccessfulRow = key.get();
245 return true;
246 }
247
248 updateCounters();
249 return false;
250 } catch (IOException ioe) {
251 if (logScannerActivity) {
252 long now = System.currentTimeMillis();
253 LOG.info("Mapper took " + (now-timestamp)
254 + "ms to process " + rowcount + " rows");
255 LOG.info(ioe);
256 String lastRow = lastSuccessfulRow == null ?
257 "null" : Bytes.toStringBinary(lastSuccessfulRow);
258 LOG.info("lastSuccessfulRow=" + lastRow);
259 }
260 throw ioe;
261 }
262 }
263
264
265
266
267
268
269
270
271 private void updateCounters() throws IOException {
272 ScanMetrics scanMetrics = this.scan.getScanMetrics();
273 if (scanMetrics == null) {
274 return;
275 }
276
277 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
278 }
279
280 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
281 Method getCounter, TaskAttemptContext context, long numStale) {
282
283 if (getCounter == null) {
284 return;
285 }
286
287 try {
288 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
289 Counter ct = (Counter)getCounter.invoke(context,
290 HBASE_COUNTER_GROUP_NAME, entry.getKey());
291
292 ct.increment(entry.getValue());
293 }
294 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
295 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
296 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
297 "NUM_SCAN_RESULTS_STALE")).increment(numStale);
298 } catch (Exception e) {
299 LOG.debug("can't update counter." + StringUtils.stringifyException(e));
300 }
301 }
302
303
304
305
306
307
308 public float getProgress() {
309
310 return 0;
311 }
312
313 }