1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.util.ByteRange;
30 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
31
32 import com.google.common.base.Preconditions;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class HeapMemStoreLAB implements MemStoreLAB {
57
58 static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
59 static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
60 static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
61 static final int MAX_ALLOC_DEFAULT = 256 * 1024;
62
63
64 private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
65
66 private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
67 final int chunkSize;
68 final int maxAlloc;
69 private final MemStoreChunkPool chunkPool;
70
71
72
73 private volatile boolean closed = false;
74
75
76 private AtomicBoolean reclaimed = new AtomicBoolean(false);
77
78 private final AtomicInteger openScannerCount = new AtomicInteger();
79
80
81 public HeapMemStoreLAB() {
82 this(new Configuration());
83 }
84
85 public HeapMemStoreLAB(Configuration conf) {
86 chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
87 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
88 this.chunkPool = MemStoreChunkPool.getPool(conf);
89
90
91 Preconditions.checkArgument(
92 maxAlloc <= chunkSize,
93 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
94 }
95
96
97
98
99
100
101
102 @Override
103 public ByteRange allocateBytes(int size) {
104 Preconditions.checkArgument(size >= 0, "negative size");
105
106
107
108 if (size > maxAlloc) {
109 return null;
110 }
111
112 while (true) {
113 Chunk c = getOrMakeChunk();
114
115
116 int allocOffset = c.alloc(size);
117 if (allocOffset != -1) {
118
119
120 return new SimpleMutableByteRange(c.data, allocOffset, size);
121 }
122
123
124
125 tryRetireChunk(c);
126 }
127 }
128
129
130
131
132
133 @Override
134 public void close() {
135 this.closed = true;
136
137
138 if (chunkPool != null && openScannerCount.get() == 0
139 && reclaimed.compareAndSet(false, true)) {
140 chunkPool.putbackChunks(this.chunkQueue);
141 }
142 }
143
144
145
146
147 @Override
148 public void incScannerCount() {
149 this.openScannerCount.incrementAndGet();
150 }
151
152
153
154
155 @Override
156 public void decScannerCount() {
157 int count = this.openScannerCount.decrementAndGet();
158 if (chunkPool != null && count == 0 && this.closed
159 && reclaimed.compareAndSet(false, true)) {
160 chunkPool.putbackChunks(this.chunkQueue);
161 }
162 }
163
164
165
166
167
168
169 private void tryRetireChunk(Chunk c) {
170 curChunk.compareAndSet(c, null);
171
172
173
174
175
176
177 }
178
179
180
181
182
183 private Chunk getOrMakeChunk() {
184 while (true) {
185
186 Chunk c = curChunk.get();
187 if (c != null) {
188 return c;
189 }
190
191
192
193
194 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
195 if (curChunk.compareAndSet(null, c)) {
196
197
198 c.init();
199 this.chunkQueue.add(c);
200 return c;
201 } else if (chunkPool != null) {
202 chunkPool.putbackChunk(c);
203 }
204
205
206 }
207 }
208
209
210
211
212 static class Chunk {
213
214 private byte[] data;
215
216 private static final int UNINITIALIZED = -1;
217 private static final int OOM = -2;
218
219
220
221
222 private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
223
224
225 private AtomicInteger allocCount = new AtomicInteger();
226
227
228 private final int size;
229
230
231
232
233
234
235 Chunk(int size) {
236 this.size = size;
237 }
238
239
240
241
242
243
244 public void init() {
245 assert nextFreeOffset.get() == UNINITIALIZED;
246 try {
247 if (data == null) {
248 data = new byte[size];
249 }
250 } catch (OutOfMemoryError e) {
251 boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
252 assert failInit;
253 throw e;
254 }
255
256 boolean initted = nextFreeOffset.compareAndSet(
257 UNINITIALIZED, 0);
258
259
260 Preconditions.checkState(initted,
261 "Multiple threads tried to init same chunk");
262 }
263
264
265
266
267 void reset() {
268 if (nextFreeOffset.get() != UNINITIALIZED) {
269 nextFreeOffset.set(UNINITIALIZED);
270 allocCount.set(0);
271 }
272 }
273
274
275
276
277
278 public int alloc(int size) {
279 while (true) {
280 int oldOffset = nextFreeOffset.get();
281 if (oldOffset == UNINITIALIZED) {
282
283
284
285
286 Thread.yield();
287 continue;
288 }
289 if (oldOffset == OOM) {
290
291 return -1;
292 }
293
294 if (oldOffset + size > data.length) {
295 return -1;
296 }
297
298
299 if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
300
301 allocCount.incrementAndGet();
302 return oldOffset;
303 }
304
305 }
306 }
307
308 @Override
309 public String toString() {
310 return "Chunk@" + System.identityHashCode(this) +
311 " allocs=" + allocCount.get() + "waste=" +
312 (data.length - nextFreeOffset.get());
313 }
314 }
315 }