1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
34 import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
35 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
36 import org.apache.hadoop.hbase.client.coprocessor.Batch;
37 import org.apache.hadoop.hbase.regionserver.Region;
38 import org.apache.hadoop.hbase.testclassification.MediumTests;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
47 import static org.junit.Assert.assertEquals;
48 import static org.junit.Assert.assertNotEquals;
49 import static org.junit.Assert.assertNotNull;
50 import static org.junit.Assert.assertTrue;
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestClientPushback {
57
58 private static final Log LOG = LogFactory.getLog(TestClientPushback.class);
59 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
60
61 private static final byte[] tableName = Bytes.toBytes("client-pushback");
62 private static final byte[] family = Bytes.toBytes("f");
63 private static final byte[] qualifier = Bytes.toBytes("q");
64 private static long flushSizeBytes = 1024;
65
66 @BeforeClass
67 public static void setupCluster() throws Exception{
68 Configuration conf = UTIL.getConfiguration();
69
70 conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
71
72 conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
73 ClientBackoffPolicy.class);
74
75
76 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
77
78 conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
79 conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
80 UTIL.startMiniCluster();
81 UTIL.createTable(tableName, family);
82 }
83
84 @AfterClass
85 public static void teardownCluster() throws Exception{
86 UTIL.shutdownMiniCluster();
87 }
88
89 @Test(timeout=60000)
90 public void testClientTracksServerPushback() throws Exception{
91 Configuration conf = UTIL.getConfiguration();
92 TableName tablename = TableName.valueOf(tableName);
93 Connection conn = ConnectionFactory.createConnection(conf);
94 HTable table = (HTable) conn.getTable(tablename);
95
96 table.setAutoFlushTo(true);
97
98
99 Put p = new Put(Bytes.toBytes("row"));
100 p.add(family, qualifier, Bytes.toBytes("value1"));
101 table.put(p);
102
103
104 ClusterConnection connection = table.connection;
105 ClientBackoffPolicy backoffPolicy = connection.getBackoffPolicy();
106 assertTrue("Backoff policy is not correctly configured",
107 backoffPolicy instanceof ExponentialClientBackoffPolicy);
108
109 ServerStatisticTracker stats = connection.getStatisticsTracker();
110 assertNotNull( "No stats configured for the client!", stats);
111 Region region = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get(0);
112
113 ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
114 byte[] regionName = region.getRegionInfo().getRegionName();
115
116
117 ServerStatistics serverStats = stats.getServerStatsForTesting(server);
118 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
119 int load = regionStats.getMemstoreLoadPercent();
120 assertEquals("We did not find some load on the memstore", load,
121 regionStats.getMemstoreLoadPercent());
122
123
124 long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
125 assertNotEquals("Reported load does not produce a backoff", backoffTime, 0);
126 LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " +
127 server + " is " + backoffTime);
128
129
130
131 List<Row> ops = new ArrayList<Row>(1);
132 ops.add(p);
133 final CountDownLatch latch = new CountDownLatch(1);
134 final AtomicLong endTime = new AtomicLong();
135 long startTime = EnvironmentEdgeManager.currentTime();
136 table.mutator.ap.submit(tablename, ops, true, new Batch.Callback<Result>() {
137 @Override
138 public void update(byte[] region, byte[] row, Result result) {
139 endTime.set(EnvironmentEdgeManager.currentTime());
140 latch.countDown();
141 }
142 }, true);
143
144
145
146
147 String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
148 MetricsConnection.RegionStats rsStats = connection.getConnectionMetrics().
149 serverStats.get(server).get(regionName);
150 assertEquals(name, rsStats.name);
151 assertEquals(rsStats.heapOccupancyHist.mean(),
152 (double)regionStats.getHeapOccupancyPercent(), 0.1 );
153 assertEquals(rsStats.memstoreLoadHist.mean(),
154 (double)regionStats.getMemstoreLoadPercent(), 0.1);
155
156 MetricsConnection.RunnerStats runnerStats = connection.getConnectionMetrics().runnerStats;
157
158 assertEquals(runnerStats.delayRunners.count(), 1);
159 assertEquals(runnerStats.normalRunners.count(), 1);
160 assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 0.1);
161
162 latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
163 assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
164 assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
165 }
166 }