View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  
29  import org.apache.commons.lang.exception.ExceptionUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.logging.impl.Log4JLogger;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CoordinatedStateManager;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.HTestConst;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.KVComparator;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.ResultScanner;
50  import org.apache.hadoop.hbase.client.Scan;
51  import org.apache.hadoop.hbase.client.ScannerCallable;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.filter.Filter;
54  import org.apache.hadoop.hbase.filter.FilterBase;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
56  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
57  import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
58  import org.apache.hadoop.hbase.testclassification.MediumTests;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.wal.WAL;
61  import org.apache.log4j.Level;
62  import org.junit.After;
63  import org.junit.AfterClass;
64  import org.junit.Before;
65  import org.junit.BeforeClass;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  
69  import com.google.protobuf.RpcController;
70  import com.google.protobuf.ServiceException;
71  
72  /**
73   * Here we test to make sure that scans return the expected Results when the server is sending the
74   * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
75   * the scanner on the client side from timing out). A heartbeat message is sent from the server to
76   * the client when the server has exceeded the time limit during the processing of the scan. When
77   * the time limit is reached, the server will return to the Client whatever Results it has
78   * accumulated (potentially empty).
79   */
80  @Category(MediumTests.class)
81  public class TestScannerHeartbeatMessages {
82    private static final Log LOG = LogFactory.getLog(TestScannerHeartbeatMessages.class);
83  
84    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85  
86    private static Table TABLE = null;
87  
88    /**
89     * Table configuration
90     */
91    private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
92  
93    private static int NUM_ROWS = 5;
94    private static byte[] ROW = Bytes.toBytes("testRow");
95    private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
96  
97    private static int NUM_FAMILIES = 3;
98    private static byte[] FAMILY = Bytes.toBytes("testFamily");
99    private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
100 
101   private static int NUM_QUALIFIERS = 3;
102   private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
103   private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
104 
105   private static int VALUE_SIZE = 128;
106   private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
107 
108   // Time, in milliseconds, that the client will wait for a response from the server before timing
109   // out. This value is used server side to determine when it is necessary to send a heartbeat
110   // message to the client
111   private static int CLIENT_TIMEOUT = 2000;
112 
113   // The server limits itself to running for half of the CLIENT_TIMEOUT value.
114   private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
115 
116   // By default, at most one row's worth of cells will be retrieved before the time limit is reached
117   private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
118   // By default, at most cells for two column families are retrieved before the time limit is
119   // reached
120   private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES;
121 
122   @BeforeClass
123   public static void setUpBeforeClass() throws Exception {
124     ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
125     ((Log4JLogger) HeartbeatRPCServices.LOG).getLogger().setLevel(Level.ALL);
126     Configuration conf = TEST_UTIL.getConfiguration();
127 
128     conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
129     conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
130     conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
131     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
132     conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
133 
134     // Check the timeout condition after every cell
135     conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
136     TEST_UTIL.startMiniCluster(1);
137 
138     TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
139   }
140 
141   static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
142       byte[][] qualifiers, byte[] cellValue) throws IOException {
143     Table ht = TEST_UTIL.createTable(name, families);
144     List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
145     ht.put(puts);
146 
147     return ht;
148   }
149 
150   /**
151    * Make puts to put the input value into each combination of row, family, and qualifier
152    * @param rows
153    * @param families
154    * @param qualifiers
155    * @param value
156    * @return
157    * @throws IOException
158    */
159   static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
160       byte[] value) throws IOException {
161     Put put;
162     ArrayList<Put> puts = new ArrayList<>();
163 
164     for (int row = 0; row < rows.length; row++) {
165       put = new Put(rows[row]);
166       for (int fam = 0; fam < families.length; fam++) {
167         for (int qual = 0; qual < qualifiers.length; qual++) {
168           KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
169           put.add(kv);
170         }
171       }
172       puts.add(put);
173     }
174 
175     return puts;
176   }
177 
178   @AfterClass
179   public static void tearDownAfterClass() throws Exception {
180     TEST_UTIL.deleteTable(TABLE_NAME);
181     TEST_UTIL.shutdownMiniCluster();
182   }
183 
184   @Before
185   public void setupBeforeTest() throws Exception {
186     disableSleeping();
187   }
188 
189   @After
190   public void teardownAfterTest() throws Exception {
191     disableSleeping();
192   }
193 
194   /**
195    * Test a variety of scan configurations to ensure that they return the expected Results when
196    * heartbeat messages are necessary. These tests are accumulated under one test case to ensure
197    * that they don't run in parallel. If the tests ran in parallel, they may conflict with each
198    * other due to changing static variables
199    */
200   @Test
201   public void testScannerHeartbeatMessages() throws Exception {
202     testImportanceOfHeartbeats(testHeartbeatBetweenRows());
203     testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
204     testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
205   }
206 
207   /**
208    * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
209    * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
210    * disabled, the test should throw an exception.
211    * @param testCallable
212    * @throws InterruptedException
213    */
214   public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
215     HeartbeatRPCServices.heartbeatsEnabled = true;
216 
217     try {
218       testCallable.call();
219     } catch (Exception e) {
220       fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
221           + ExceptionUtils.getStackTrace(e));
222     }
223 
224     HeartbeatRPCServices.heartbeatsEnabled = false;
225     try {
226       testCallable.call();
227     } catch (Exception e) {
228       return;
229     } finally {
230       HeartbeatRPCServices.heartbeatsEnabled = true;
231     }
232     fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
233         + " is not thrown, the test case is not testing the importance of heartbeat messages");
234   }
235 
236   /**
237    * Test the case that the time limit for the scan is reached after each full row of cells is
238    * fetched.
239    * @throws Exception
240    */
241   public Callable<Void> testHeartbeatBetweenRows() throws Exception {
242     return new Callable<Void>() {
243 
244       @Override
245       public Void call() throws Exception {
246         // Configure the scan so that it can read the entire table in a single RPC. We want to test
247         // the case where a scan stops on the server side due to a time limit
248         Scan scan = new Scan();
249         scan.setMaxResultSize(Long.MAX_VALUE);
250         scan.setCaching(Integer.MAX_VALUE);
251 
252         testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
253         return null;
254       }
255     };
256   }
257 
258   /**
259    * Test the case that the time limit for scans is reached in between column families
260    * @throws Exception
261    */
262   public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
263     return new Callable<Void>() {
264       @Override
265       public Void call() throws Exception {
266         // Configure the scan so that it can read the entire table in a single RPC. We want to test
267         // the case where a scan stops on the server side due to a time limit
268         Scan baseScan = new Scan();
269         baseScan.setMaxResultSize(Long.MAX_VALUE);
270         baseScan.setCaching(Integer.MAX_VALUE);
271 
272         // Copy the scan before each test. When a scan object is used by a scanner, some of its
273         // fields may be changed such as start row
274         Scan scanCopy = new Scan(baseScan);
275         testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
276         scanCopy = new Scan(baseScan);
277         testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
278         return null;
279       }
280     };
281   }
282 
283   public static class SparseFilter extends FilterBase{
284 
285     @Override
286     public ReturnCode filterKeyValue(Cell v) throws IOException {
287       try {
288         Thread.sleep(SERVER_TIME_LIMIT + 10);
289       } catch (InterruptedException e) {
290         Thread.currentThread().interrupt();
291       }
292       return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
293           ReturnCode.INCLUDE :
294           ReturnCode.SKIP;
295     }
296 
297     public static Filter parseFrom(final byte [] pbBytes){
298       return new SparseFilter();
299     }
300   }
301 
302   /**
303    * Test the case that there is a filter which filters most of cells
304    * @throws Exception
305    */
306   public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
307     return new Callable<Void>() {
308       @Override
309       public Void call() throws Exception {
310         Scan scan = new Scan();
311         scan.setMaxResultSize(Long.MAX_VALUE);
312         scan.setCaching(Integer.MAX_VALUE);
313         scan.setFilter(new SparseFilter());
314         ResultScanner scanner = TABLE.getScanner(scan);
315         int num = 0;
316         while (scanner.next() != null) {
317           num++;
318         }
319         assertEquals(1, num);
320         scanner.close();
321 
322         scan = new Scan();
323         scan.setMaxResultSize(Long.MAX_VALUE);
324         scan.setCaching(Integer.MAX_VALUE);
325         scan.setFilter(new SparseFilter());
326         scan.setAllowPartialResults(true);
327         scanner = TABLE.getScanner(scan);
328         num = 0;
329         while (scanner.next() != null) {
330           num++;
331         }
332         assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
333         scanner.close();
334 
335         return null;
336       }
337     };
338   }
339 
340   /**
341    * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
342    * necessary
343    * @param scan The scan configuration being tested
344    * @param rowSleepTime The time to sleep between fetches of row cells
345    * @param cfSleepTime The time to sleep between fetches of column family cells
346    * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
347    *          that column family are fetched
348    * @throws Exception
349    */
350   public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
351       int cfSleepTime, boolean sleepBeforeCf) throws Exception {
352     disableSleeping();
353     final ResultScanner scanner = TABLE.getScanner(scan);
354     final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
355 
356     Result r1 = null;
357     Result r2 = null;
358 
359     while ((r1 = scanner.next()) != null) {
360       // Enforce the specified sleep conditions during calls to the heartbeat scanner
361       configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
362       r2 = scannerWithHeartbeats.next();
363       disableSleeping();
364 
365       assertTrue(r2 != null);
366       try {
367         Result.compareResults(r1, r2);
368       } catch (Exception e) {
369         fail(e.getMessage());
370       }
371     }
372 
373     assertTrue(scannerWithHeartbeats.next() == null);
374     scanner.close();
375     scannerWithHeartbeats.close();
376   }
377 
378   /**
379    * Helper method for setting the time to sleep between rows and column families. If a sleep time
380    * is negative then that sleep will be disabled
381    * @param rowSleepTime
382    * @param cfSleepTime
383    */
384   private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
385     HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
386     HeartbeatHRegion.rowSleepTime = rowSleepTime;
387 
388     HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
389     HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
390     HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
391   }
392 
393   /**
394    * Disable the sleeping mechanism server side.
395    */
396   private static void disableSleeping() {
397     HeartbeatHRegion.sleepBetweenRows = false;
398     HeartbeatHRegion.sleepBetweenColumnFamilies = false;
399   }
400 
401   /**
402    * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
403    * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
404    */
405   private static class HeartbeatHRegionServer extends HRegionServer {
406     public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
407       super(conf);
408     }
409 
410     public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm)
411         throws IOException, InterruptedException {
412       super(conf, csm);
413     }
414 
415     @Override
416     protected RSRpcServices createRpcServices() throws IOException {
417       return new HeartbeatRPCServices(this);
418     }
419   }
420 
421   /**
422    * Custom RSRpcServices instance that allows heartbeat support to be toggled
423    */
424   private static class HeartbeatRPCServices extends RSRpcServices {
425     private static boolean heartbeatsEnabled = true;
426 
427     public HeartbeatRPCServices(HRegionServer rs) throws IOException {
428       super(rs);
429     }
430 
431     @Override
432     public ScanResponse scan(RpcController controller, ScanRequest request) 
433         throws ServiceException {
434       ScanRequest.Builder builder = ScanRequest.newBuilder(request);
435       builder.setClientHandlesHeartbeats(heartbeatsEnabled);
436       return super.scan(controller, builder.build());
437     }
438   }
439 
440   /**
441    * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
442    * between fetches of row Results and/or column family cells. Useful for emulating an instance
443    * where the server is taking a long time to process a client's scan request
444    */
445   private static class HeartbeatHRegion extends HRegion {
446     // Row sleeps occur AFTER each row worth of cells is retrieved.
447     private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
448     private static boolean sleepBetweenRows = false;
449 
450     // The sleep for column families can be initiated before or after we fetch the cells for the
451     // column family. If the sleep occurs BEFORE then the time limits will be reached inside
452     // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
453     // limit will be reached inside RegionScanner after all the cells for a column family have been
454     // retrieved.
455     private static boolean sleepBeforeColumnFamily = false;
456     private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
457     private static boolean sleepBetweenColumnFamilies = false;
458 
459     public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
460         HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
461       super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
462     }
463 
464     public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
465         HTableDescriptor htd, RegionServerServices rsServices) {
466       super(fs, wal, confParam, htd, rsServices);
467     }
468 
469     private static void columnFamilySleep() {
470       if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
471         try {
472           Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
473         } catch (InterruptedException e) {
474         }
475       }
476     }
477 
478     private static void rowSleep() {
479       try {
480         if (HeartbeatHRegion.sleepBetweenRows) {
481           Thread.sleep(HeartbeatHRegion.rowSleepTime);
482         }
483       } catch (InterruptedException e) {
484       }
485     }
486 
487     // Instantiate the custom heartbeat region scanners
488     @Override
489     protected RegionScanner instantiateRegionScanner(Scan scan,
490         List<KeyValueScanner> additionalScanners) throws IOException {
491       if (scan.isReversed()) {
492         if (scan.getFilter() != null) {
493           scan.getFilter().setReversed(true);
494         }
495         return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
496       }
497       return new HeartbeatRegionScanner(scan, additionalScanners, this);
498     }
499   }
500 
501   /**
502    * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
503    * and/or column family cells
504    */
505   private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
506     HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
507         HRegion region) throws IOException {
508       super(scan, additionalScanners, region);
509     }
510 
511     @Override
512     public boolean nextRaw(List<Cell> outResults, ScannerContext context)
513         throws IOException {
514       boolean moreRows = super.nextRaw(outResults, context);
515       HeartbeatHRegion.rowSleep();
516       return moreRows;
517     }
518 
519     @Override
520     protected void initializeKVHeap(List<KeyValueScanner> scanners,
521         List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
522       this.storeHeap = new HeartbeatReversedKVHeap(scanners, region.getComparator());
523       if (!joinedScanners.isEmpty()) {
524         this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, region.getComparator());
525       }
526     }
527   }
528 
529   /**
530    * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
531    * column family cells
532    */
533   private static class HeartbeatRegionScanner extends RegionScannerImpl {
534     HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
535         throws IOException {
536       region.super(scan, additionalScanners, region);
537     }
538 
539     @Override
540     public boolean nextRaw(List<Cell> outResults, ScannerContext context)
541         throws IOException {
542       boolean moreRows = super.nextRaw(outResults, context);
543       HeartbeatHRegion.rowSleep();
544       return moreRows;
545     }
546 
547     @Override
548     protected void initializeKVHeap(List<KeyValueScanner> scanners,
549         List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
550       this.storeHeap = new HeartbeatKVHeap(scanners, region.getComparator());
551       if (!joinedScanners.isEmpty()) {
552         this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getComparator());
553       }
554     }
555   }
556 
557   /**
558    * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
559    * cells. Useful for testing
560    */
561   private static final class HeartbeatKVHeap extends KeyValueHeap {
562     public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator)
563         throws IOException {
564       super(scanners, comparator);
565     }
566 
567     HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
568         throws IOException {
569       super(scanners, comparator);
570     }
571 
572     @Override
573     public boolean next(List<Cell> result, ScannerContext context)
574         throws IOException {
575       if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
576       boolean moreRows = super.next(result, context);
577       if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
578       return moreRows;
579     }
580   }
581 
582   /**
583    * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
584    * cells.
585    */
586   private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
587     public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 
588         KVComparator comparator) throws IOException {
589       super(scanners, comparator);
590     }
591 
592     @Override
593     public boolean next(List<Cell> result, ScannerContext context)
594         throws IOException {
595       if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
596       boolean moreRows = super.next(result, context);
597       if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
598       return moreRows;
599     }
600   }
601 }