1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.lang.reflect.Method;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.ipc.PriorityFunction;
30 import org.apache.hadoop.hbase.ipc.QosPriority;
31 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
32 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
33 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
34 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
35 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
41 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
42 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
43
44 import com.google.common.annotations.VisibleForTesting;
45 import com.google.protobuf.Message;
46 import com.google.protobuf.TextFormat;
47 import org.apache.hadoop.hbase.security.Superusers;
48 import org.apache.hadoop.hbase.security.User;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 @InterfaceAudience.Private
72 public class AnnotationReadingPriorityFunction implements PriorityFunction {
73 private static final Log LOG =
74 LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
75
76
77 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
78
79 protected final Map<String, Integer> annotatedQos;
80
81
82 private RSRpcServices rpcServices;
83 @SuppressWarnings("unchecked")
84 private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
85 GetRegionInfoRequest.class,
86 GetStoreFileRequest.class,
87 CloseRegionRequest.class,
88 FlushRegionRequest.class,
89 SplitRegionRequest.class,
90 CompactRegionRequest.class,
91 GetRequest.class,
92 MutateRequest.class,
93 ScanRequest.class
94 };
95
96
97 private final Map<String, Class<? extends Message>> argumentToClassMap =
98 new HashMap<String, Class<? extends Message>>();
99 private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
100 new HashMap<String, Map<Class<? extends Message>, Method>>();
101
102 private final float scanVirtualTimeWeight;
103
104
105
106
107
108
109
110
111 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
112 this(rpcServices, rpcServices.getClass());
113 }
114
115
116
117
118
119
120
121
122
123
124 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
125 Class<? extends RSRpcServices> clz) {
126 Map<String,Integer> qosMap = new HashMap<String,Integer>();
127 for (Method m : clz.getMethods()) {
128 QosPriority p = m.getAnnotation(QosPriority.class);
129 if (p != null) {
130
131
132
133
134
135 String capitalizedMethodName = capitalize(m.getName());
136 qosMap.put(capitalizedMethodName, p.priority());
137 }
138 }
139 this.rpcServices = rpcServices;
140 this.annotatedQos = qosMap;
141 if (methodMap.get("getRegion") == null) {
142 methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
143 methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
144 }
145 for (Class<? extends Message> cls : knownArgumentClasses) {
146 argumentToClassMap.put(cls.getName(), cls);
147 try {
148 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
149 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
150 } catch (Exception e) {
151 throw new RuntimeException(e);
152 }
153 }
154
155 Configuration conf = rpcServices.getConfiguration();
156 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
157 }
158
159 private String capitalize(final String s) {
160 StringBuilder strBuilder = new StringBuilder(s);
161 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
162 return strBuilder.toString();
163 }
164
165
166
167
168
169
170
171
172
173 @Override
174 public int getPriority(RequestHeader header, Message param, User user) {
175 int priorityByAnnotation = getAnnotatedPriority(header);
176
177 if (priorityByAnnotation >= 0) {
178 return priorityByAnnotation;
179 }
180
181
182 try {
183 if (Superusers.isSuperUser(user)) {
184 return HConstants.ADMIN_QOS;
185 }
186 } catch (IllegalStateException ex) {
187
188
189 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
190 return HConstants.NORMAL_QOS;
191 }
192
193 return getBasePriority(header, param);
194 }
195
196
197
198
199
200
201
202 protected int getAnnotatedPriority(RequestHeader header) {
203 String methodName = header.getMethodName();
204 Integer priorityByAnnotation = annotatedQos.get(methodName);
205 if (priorityByAnnotation != null) {
206 return priorityByAnnotation;
207 }
208 return -1;
209 }
210
211
212
213
214
215
216 protected int getBasePriority(RequestHeader header, Message param) {
217 if (param == null) {
218 return HConstants.NORMAL_QOS;
219 }
220 if (param instanceof MultiRequest) {
221
222
223 return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
224 }
225
226 String cls = param.getClass().getName();
227 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
228 RegionSpecifier regionSpecifier = null;
229
230 try {
231
232
233
234
235 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
236 if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
237 Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
238 regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
239 Region region = rpcServices.getRegion(regionSpecifier);
240 if (region.getRegionInfo().isSystemTable()) {
241 if (LOG.isTraceEnabled()) {
242 LOG.trace("High priority because region=" +
243 region.getRegionInfo().getRegionNameAsString());
244 }
245 return HConstants.SYSTEMTABLE_QOS;
246 }
247 }
248 } catch (Exception ex) {
249
250
251 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
252 return HConstants.NORMAL_QOS;
253 }
254
255 if (param instanceof ScanRequest) {
256 ScanRequest request = (ScanRequest)param;
257 if (!request.hasScannerId()) {
258 return HConstants.NORMAL_QOS;
259 }
260 RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
261 if (scanner != null && scanner.getRegionInfo().isSystemTable()) {
262 if (LOG.isTraceEnabled()) {
263
264 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
265 }
266 return HConstants.SYSTEMTABLE_QOS;
267 }
268 }
269
270 return HConstants.NORMAL_QOS;
271 }
272
273
274
275
276
277
278
279
280 @Override
281 public long getDeadline(RequestHeader header, Message param) {
282 if (param instanceof ScanRequest) {
283 ScanRequest request = (ScanRequest)param;
284 if (!request.hasScannerId()) {
285 return 0;
286 }
287
288
289
290
291 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
292 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
293 }
294 return 0;
295 }
296
297 @VisibleForTesting
298 void setRegionServer(final HRegionServer hrs) {
299 this.rpcServices = hrs.getRSRpcServices();
300 }
301 }