1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableSet;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.client.Get;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.IsolationLevel;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.client.Durability;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
53 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56 import org.apache.hadoop.hbase.regionserver.HStore;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
59 import org.apache.hadoop.hbase.regionserver.ScanType;
60 import org.apache.hadoop.hbase.regionserver.Store;
61 import org.apache.hadoop.hbase.regionserver.ScanInfo;
62 import org.apache.hadoop.hbase.regionserver.StoreScanner;
63 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64 import org.junit.AfterClass;
65 import org.junit.BeforeClass;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68
69 import org.junit.runner.RunWith;
70 import org.junit.runners.Parameterized;
71 import org.junit.runners.Parameterized.Parameters;
72
73 @Category(MediumTests.class)
74 @RunWith(Parameterized.class)
75 public class TestCoprocessorScanPolicy {
76 private static final Log LOG = LogFactory.getLog(TestCoprocessorScanPolicy.class);
77 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
78 private static final byte[] F = Bytes.toBytes("fam");
79 private static final byte[] Q = Bytes.toBytes("qual");
80 private static final byte[] R = Bytes.toBytes("row");
81
82 @BeforeClass
83 public static void setUpBeforeClass() throws Exception {
84 Configuration conf = TEST_UTIL.getConfiguration();
85 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
86 ScanObserver.class.getName());
87 TEST_UTIL.startMiniCluster();
88 }
89
90 @AfterClass
91 public static void tearDownAfterClass() throws Exception {
92 TEST_UTIL.shutdownMiniCluster();
93 }
94
95 @Parameters
96 public static Collection<Object[]> parameters() {
97 return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
98 }
99
100 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
101 TEST_UTIL.getMiniHBaseCluster().getConf()
102 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
103 }
104
105 @Test
106 public void testBaseCases() throws Exception {
107 TableName tableName =
108 TableName.valueOf("baseCases");
109 if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
110 TEST_UTIL.deleteTable(tableName);
111 }
112 Table t = TEST_UTIL.createTable(tableName, F, 1);
113
114 Put p = new Put(R);
115 p.setAttribute("versions", new byte[]{});
116 p.add(F, tableName.getName(), Bytes.toBytes(2));
117 t.put(p);
118
119 long now = EnvironmentEdgeManager.currentTime();
120
121
122 p = new Put(R);
123 p.add(F, Q, now, Q);
124 t.put(p);
125 p = new Put(R);
126 p.add(F, Q, now+1, Q);
127 t.put(p);
128 Get g = new Get(R);
129 g.setMaxVersions(10);
130 Result r = t.get(g);
131 assertEquals(2, r.size());
132
133 TEST_UTIL.flush(tableName);
134 TEST_UTIL.compact(tableName, true);
135
136
137 g = new Get(R);
138 g.setMaxVersions(10);
139 r = t.get(g);
140 assertEquals(2, r.size());
141
142
143 p = new Put(R);
144 p.add(F, Q, now+2, Q);
145 t.put(p);
146 g = new Get(R);
147 g.setMaxVersions(10);
148 r = t.get(g);
149
150 assertEquals(2, r.size());
151
152 t.close();
153 }
154
155 @Test
156 public void testTTL() throws Exception {
157 TableName tableName =
158 TableName.valueOf("testTTL");
159 if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
160 TEST_UTIL.deleteTable(tableName);
161 }
162 HTableDescriptor desc = new HTableDescriptor(tableName);
163 HColumnDescriptor hcd = new HColumnDescriptor(F)
164 .setMaxVersions(10)
165 .setTimeToLive(1);
166 desc.addFamily(hcd);
167 TEST_UTIL.getHBaseAdmin().createTable(desc);
168 Table t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
169 long now = EnvironmentEdgeManager.currentTime();
170 ManualEnvironmentEdge me = new ManualEnvironmentEdge();
171 me.setValue(now);
172 EnvironmentEdgeManagerTestHelper.injectEdge(me);
173
174 long ts = now - 2000;
175
176 Put p = new Put(R);
177 p.setAttribute("ttl", new byte[]{});
178 p.add(F, tableName.getName(), Bytes.toBytes(3000L));
179 t.put(p);
180
181 p = new Put(R);
182 p.add(F, Q, ts, Q);
183 t.put(p);
184 p = new Put(R);
185 p.add(F, Q, ts+1, Q);
186 t.put(p);
187
188
189
190 Get g = new Get(R);
191 g.setMaxVersions(10);
192 Result r = t.get(g);
193
194 assertEquals(2, r.size());
195
196 TEST_UTIL.flush(tableName);
197 TEST_UTIL.compact(tableName, true);
198
199 g = new Get(R);
200 g.setMaxVersions(10);
201 r = t.get(g);
202
203 assertEquals(2, r.size());
204
205
206 me.setValue(now + 2000);
207
208 g = new Get(R);
209 g.setMaxVersions(10);
210 r = t.get(g);
211
212 assertEquals(0, r.size());
213 t.close();
214 }
215
216 public static class ScanObserver extends BaseRegionObserver {
217 private Map<TableName, Long> ttls =
218 new HashMap<TableName, Long>();
219 private Map<TableName, Integer> versions =
220 new HashMap<TableName, Integer>();
221
222
223
224 @Override
225 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
226 final WALEdit edit, final Durability durability) throws IOException {
227 if (put.getAttribute("ttl") != null) {
228 Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
229 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
230 ttls.put(TableName.valueOf(kv.getQualifier()), Bytes.toLong(kv.getValue()));
231 c.bypass();
232 } else if (put.getAttribute("versions") != null) {
233 Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
234 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
235 versions.put(TableName.valueOf(kv.getQualifier()), Bytes.toInt(kv.getValue()));
236 c.bypass();
237 }
238 }
239
240 @Override
241 public InternalScanner preFlushScannerOpen(
242 final ObserverContext<RegionCoprocessorEnvironment> c,
243 Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
244 Long newTtl = ttls.get(store.getTableName());
245 if (newTtl != null) {
246 System.out.println("PreFlush:" + newTtl);
247 }
248 Integer newVersions = versions.get(store.getTableName());
249 ScanInfo oldSI = store.getScanInfo();
250 HColumnDescriptor family = store.getFamily();
251 ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
252 family.getName(), family.getMinVersions(),
253 newVersions == null ? family.getMaxVersions() : newVersions,
254 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
255 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
256 Scan scan = new Scan();
257 scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
258 return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
259 ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
260 HConstants.OLDEST_TIMESTAMP);
261 }
262
263 @Override
264 public InternalScanner preCompactScannerOpen(
265 final ObserverContext<RegionCoprocessorEnvironment> c,
266 Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
267 long earliestPutTs, InternalScanner s) throws IOException {
268 Long newTtl = ttls.get(store.getTableName());
269 Integer newVersions = versions.get(store.getTableName());
270 ScanInfo oldSI = store.getScanInfo();
271 HColumnDescriptor family = store.getFamily();
272 ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
273 family.getName(), family.getMinVersions(),
274 newVersions == null ? family.getMaxVersions() : newVersions,
275 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
276 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
277 Scan scan = new Scan();
278 scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
279 return new StoreScanner(store, scanInfo, scan, scanners, scanType,
280 store.getSmallestReadPoint(), earliestPutTs);
281 }
282
283 @Override
284 public KeyValueScanner preStoreScannerOpen(
285 final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
286 final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
287 TableName tn = store.getTableName();
288 if (!tn.isSystemTable()) {
289 Long newTtl = ttls.get(store.getTableName());
290 Integer newVersions = versions.get(store.getTableName());
291 ScanInfo oldSI = store.getScanInfo();
292 HColumnDescriptor family = store.getFamily();
293 ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
294 family.getName(), family.getMinVersions(),
295 newVersions == null ? family.getMaxVersions() : newVersions,
296 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
297 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
298 return new StoreScanner(store, scanInfo, scan, targetCols,
299 ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
300 } else {
301 return s;
302 }
303 }
304 }
305 }