1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31 import org.apache.hadoop.hbase.regionserver.StoreFile;
32 import org.apache.hadoop.hbase.regionserver.StoreUtils;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49
50 private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51
52
53 public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54 super(conf, storeConfigInfo);
55 }
56
57 @Override
58 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59 List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60 boolean forceMajor) throws IOException {
61
62 if(forceMajor){
63 LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64 }
65 boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66 if(isAfterSplit){
67 LOG.info("Split detected, delegate selection to the parent policy.");
68 return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
69 mayUseOffPeak, forceMajor);
70 }
71
72
73 Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74 CompactionRequest result = new CompactionRequest(toCompact);
75 return result;
76 }
77
78 @Override
79 public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
80 boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
81 if(isAfterSplit){
82 LOG.info("Split detected, delegate to the parent policy.");
83 return super.isMajorCompaction(filesToCompact);
84 }
85 return false;
86 }
87
88 @Override
89 public boolean needsCompaction(Collection<StoreFile> storeFiles,
90 List<StoreFile> filesCompacting) {
91 boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
92 if(isAfterSplit){
93 LOG.info("Split detected, delegate to the parent policy.");
94 return super.needsCompaction(storeFiles, filesCompacting);
95 }
96 return hasExpiredStores(storeFiles);
97 }
98
99 private boolean hasExpiredStores(Collection<StoreFile> files) {
100 long currentTime = EnvironmentEdgeManager.currentTime();
101 for(StoreFile sf: files){
102
103 Long maxTs = sf.getReader().getMaxTimestamp();
104 long maxTtl = storeConfigInfo.getStoreFileTtl();
105 if(maxTs == null
106 || maxTtl == Long.MAX_VALUE
107 || (currentTime - maxTtl < maxTs)){
108 continue;
109 } else{
110 return true;
111 }
112 }
113 return false;
114 }
115
116 private Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
117 Collection<StoreFile> filesCompacting) {
118 long currentTime = EnvironmentEdgeManager.currentTime();
119 Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
120 for(StoreFile sf: files){
121
122 Long maxTs = sf.getReader().getMaxTimestamp();
123 long maxTtl = storeConfigInfo.getStoreFileTtl();
124 if(maxTs == null
125 || maxTtl == Long.MAX_VALUE
126 || (currentTime - maxTtl < maxTs)){
127 continue;
128 } else if(filesCompacting == null || filesCompacting.contains(sf) == false){
129 expiredStores.add(sf);
130 }
131 }
132 return expiredStores;
133 }
134 }