View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  // this is deliberately not in the o.a.h.h.regionserver package
20  // in order to make sure all required classes/method are available
21  
22  import static org.junit.Assert.assertEquals;
23  
24  import java.io.IOException;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableSet;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.testclassification.MediumTests;
44  import org.apache.hadoop.hbase.client.Get;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.IsolationLevel;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.client.Durability;
51  import org.apache.hadoop.hbase.client.Table;
52  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
53  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56  import org.apache.hadoop.hbase.regionserver.HStore;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
59  import org.apache.hadoop.hbase.regionserver.ScanType;
60  import org.apache.hadoop.hbase.regionserver.Store;
61  import org.apache.hadoop.hbase.regionserver.ScanInfo;
62  import org.apache.hadoop.hbase.regionserver.StoreScanner;
63  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64  import org.junit.AfterClass;
65  import org.junit.BeforeClass;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  
69  import org.junit.runner.RunWith;
70  import org.junit.runners.Parameterized;
71  import org.junit.runners.Parameterized.Parameters;
72  
73  @Category(MediumTests.class)
74  @RunWith(Parameterized.class)
75  public class TestCoprocessorScanPolicy {
76    private static final Log LOG = LogFactory.getLog(TestCoprocessorScanPolicy.class);
77    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
78    private static final byte[] F = Bytes.toBytes("fam");
79    private static final byte[] Q = Bytes.toBytes("qual");
80    private static final byte[] R = Bytes.toBytes("row");
81  
82    @BeforeClass
83    public static void setUpBeforeClass() throws Exception {
84      Configuration conf = TEST_UTIL.getConfiguration();
85      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
86          ScanObserver.class.getName());
87      TEST_UTIL.startMiniCluster();
88    }
89  
90    @AfterClass
91    public static void tearDownAfterClass() throws Exception {
92      TEST_UTIL.shutdownMiniCluster();
93    }
94  
95    @Parameters
96    public static Collection<Object[]> parameters() {
97      return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
98    }
99  
100   public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
101     TEST_UTIL.getMiniHBaseCluster().getConf()
102         .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
103   }
104 
105   @Test
106   public void testBaseCases() throws Exception {
107     TableName tableName =
108         TableName.valueOf("baseCases");
109     if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
110       TEST_UTIL.deleteTable(tableName);
111     }
112     Table t = TEST_UTIL.createTable(tableName, F, 1);
113     // set the version override to 2
114     Put p = new Put(R);
115     p.setAttribute("versions", new byte[]{});
116     p.add(F, tableName.getName(), Bytes.toBytes(2));
117     t.put(p);
118 
119     long now = EnvironmentEdgeManager.currentTime();
120 
121     // insert 2 versions
122     p = new Put(R);
123     p.add(F, Q, now, Q);
124     t.put(p);
125     p = new Put(R);
126     p.add(F, Q, now+1, Q);
127     t.put(p);
128     Get g = new Get(R);
129     g.setMaxVersions(10);
130     Result r = t.get(g);
131     assertEquals(2, r.size());
132 
133     TEST_UTIL.flush(tableName);
134     TEST_UTIL.compact(tableName, true);
135 
136     // both version are still visible even after a flush/compaction
137     g = new Get(R);
138     g.setMaxVersions(10);
139     r = t.get(g);
140     assertEquals(2, r.size());
141 
142     // insert a 3rd version
143     p = new Put(R);
144     p.add(F, Q, now+2, Q);
145     t.put(p);
146     g = new Get(R);
147     g.setMaxVersions(10);
148     r = t.get(g);
149     // still only two version visible
150     assertEquals(2, r.size());
151 
152     t.close();
153   }
154 
155   @Test
156   public void testTTL() throws Exception {
157     TableName tableName =
158         TableName.valueOf("testTTL");
159     if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
160       TEST_UTIL.deleteTable(tableName);
161     }
162     HTableDescriptor desc = new HTableDescriptor(tableName);
163     HColumnDescriptor hcd = new HColumnDescriptor(F)
164     .setMaxVersions(10)
165     .setTimeToLive(1);
166     desc.addFamily(hcd);
167     TEST_UTIL.getHBaseAdmin().createTable(desc);
168     Table t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
169     long now = EnvironmentEdgeManager.currentTime();
170     ManualEnvironmentEdge me = new ManualEnvironmentEdge();
171     me.setValue(now);
172     EnvironmentEdgeManagerTestHelper.injectEdge(me);
173     // 2s in the past
174     long ts = now - 2000;
175     // Set the TTL override to 3s
176     Put p = new Put(R);
177     p.setAttribute("ttl", new byte[]{});
178     p.add(F, tableName.getName(), Bytes.toBytes(3000L));
179     t.put(p);
180 
181     p = new Put(R);
182     p.add(F, Q, ts, Q);
183     t.put(p);
184     p = new Put(R);
185     p.add(F, Q, ts+1, Q);
186     t.put(p);
187 
188     // these two should be expired but for the override
189     // (their ts was 2s in the past)
190     Get g = new Get(R);
191     g.setMaxVersions(10);
192     Result r = t.get(g);
193     // still there?
194     assertEquals(2, r.size());
195 
196     TEST_UTIL.flush(tableName);
197     TEST_UTIL.compact(tableName, true);
198 
199     g = new Get(R);
200     g.setMaxVersions(10);
201     r = t.get(g);
202     // still there?
203     assertEquals(2, r.size());
204 
205     // roll time forward 2s.
206     me.setValue(now + 2000);
207     // now verify that data eventually does expire
208     g = new Get(R);
209     g.setMaxVersions(10);
210     r = t.get(g);
211     // should be gone now
212     assertEquals(0, r.size());
213     t.close();
214   }
215 
216   public static class ScanObserver extends BaseRegionObserver {
217     private Map<TableName, Long> ttls =
218         new HashMap<TableName, Long>();
219     private Map<TableName, Integer> versions =
220         new HashMap<TableName, Integer>();
221 
222     // lame way to communicate with the coprocessor,
223     // since it is loaded by a different class loader
224     @Override
225     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
226         final WALEdit edit, final Durability durability) throws IOException {
227       if (put.getAttribute("ttl") != null) {
228         Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
229         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
230         ttls.put(TableName.valueOf(kv.getQualifier()), Bytes.toLong(kv.getValue()));
231         c.bypass();
232       } else if (put.getAttribute("versions") != null) {
233         Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
234         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
235         versions.put(TableName.valueOf(kv.getQualifier()), Bytes.toInt(kv.getValue()));
236         c.bypass();
237       }
238     }
239 
240     @Override
241     public InternalScanner preFlushScannerOpen(
242         final ObserverContext<RegionCoprocessorEnvironment> c,
243         Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
244       Long newTtl = ttls.get(store.getTableName());
245       if (newTtl != null) {
246         System.out.println("PreFlush:" + newTtl);
247       }
248       Integer newVersions = versions.get(store.getTableName());
249       ScanInfo oldSI = store.getScanInfo();
250       HColumnDescriptor family = store.getFamily();
251       ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
252           family.getName(), family.getMinVersions(),
253           newVersions == null ? family.getMaxVersions() : newVersions,
254           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
255           oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
256       Scan scan = new Scan();
257       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
258       return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
259           ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
260           HConstants.OLDEST_TIMESTAMP);
261     }
262 
263     @Override
264     public InternalScanner preCompactScannerOpen(
265         final ObserverContext<RegionCoprocessorEnvironment> c,
266         Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
267         long earliestPutTs, InternalScanner s) throws IOException {
268       Long newTtl = ttls.get(store.getTableName());
269       Integer newVersions = versions.get(store.getTableName());
270       ScanInfo oldSI = store.getScanInfo();
271       HColumnDescriptor family = store.getFamily();
272       ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
273           family.getName(), family.getMinVersions(),
274           newVersions == null ? family.getMaxVersions() : newVersions,
275           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
276           oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
277       Scan scan = new Scan();
278       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
279       return new StoreScanner(store, scanInfo, scan, scanners, scanType,
280           store.getSmallestReadPoint(), earliestPutTs);
281     }
282 
283     @Override
284     public KeyValueScanner preStoreScannerOpen(
285         final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
286         final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
287       TableName tn = store.getTableName();
288       if (!tn.isSystemTable()) {
289         Long newTtl = ttls.get(store.getTableName());
290         Integer newVersions = versions.get(store.getTableName());
291         ScanInfo oldSI = store.getScanInfo();
292         HColumnDescriptor family = store.getFamily();
293         ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
294             family.getName(), family.getMinVersions(),
295             newVersions == null ? family.getMaxVersions() : newVersions,
296             newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
297             oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
298         return new StoreScanner(store, scanInfo, scan, targetCols,
299             ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
300       } else {
301         return s;
302       }
303     }
304   }
305 }