1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.io.IOException;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.security.PrivilegedExceptionAction;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.HTableInterface;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.security.User;
35 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
36 import org.apache.hadoop.util.StringUtils;
37
38
39
40
41 public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
42
43 private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class);
44 private User userOwner;
45
46 public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
47 TableName tableName, User userOwner) throws IOException {
48 super(dataGen, conf, tableName);
49 this.userOwner = userOwner;
50 }
51
52 @Override
53 public void start(long startKey, long endKey, int numThreads) throws IOException {
54 super.start(startKey, endKey, numThreads);
55 }
56
57 @Override
58 protected void createWriterThreads(int numThreads) throws IOException {
59 for (int i = 0; i < numThreads; ++i) {
60 HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
61 writers.add(writer);
62 }
63 }
64
65 public class HBaseWriterThreadWithACL extends HBaseWriterThread {
66
67 private Table table;
68 private WriteAccessAction writerAction = new WriteAccessAction();
69
70 public HBaseWriterThreadWithACL(int writerId) throws IOException {
71 super(writerId);
72 }
73
74 @Override
75 protected HTableInterface createTable() throws IOException {
76 return null;
77 }
78
79 @Override
80 protected void closeHTable() {
81 if (table != null) {
82 try {
83 table.close();
84 } catch (Exception e) {
85 LOG.error("Error in closing the table "+table.getName(), e);
86 }
87 }
88 }
89
90 @Override
91 public void insert(final Table table, Put put, final long keyBase) {
92 final long start = System.currentTimeMillis();
93 try {
94 put = (Put) dataGenerator.beforeMutate(keyBase, put);
95 writerAction.setPut(put);
96 writerAction.setKeyBase(keyBase);
97 writerAction.setStartTime(start);
98 userOwner.runAs(writerAction);
99 } catch (IOException e) {
100 recordFailure(table, put, keyBase, start, e);
101 } catch (InterruptedException e) {
102 failedKeySet.add(keyBase);
103 }
104 }
105
106 class WriteAccessAction implements PrivilegedExceptionAction<Object> {
107 private Put put;
108 private long keyBase;
109 private long start;
110
111 public WriteAccessAction() {
112 }
113
114 public void setPut(final Put put) {
115 this.put = put;
116 }
117
118 public void setKeyBase(final long keyBase) {
119 this.keyBase = keyBase;
120 }
121
122 public void setStartTime(final long start) {
123 this.start = start;
124 }
125
126 @Override
127 public Object run() throws Exception {
128 try {
129 if (table == null) {
130 table = new HTable(conf, tableName);
131 }
132 table.put(put);
133 } catch (IOException e) {
134 recordFailure(table, put, keyBase, start, e);
135 }
136 return null;
137 }
138 }
139 }
140
141 private void recordFailure(final Table table, final Put put, final long keyBase,
142 final long start, IOException e) {
143 failedKeySet.add(keyBase);
144 String exceptionInfo;
145 if (e instanceof RetriesExhaustedWithDetailsException) {
146 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
147 exceptionInfo = aggEx.getExhaustiveDescription();
148 } else {
149 StringWriter stackWriter = new StringWriter();
150 PrintWriter pw = new PrintWriter(stackWriter);
151 e.printStackTrace(pw);
152 pw.flush();
153 exceptionInfo = StringUtils.stringifyException(e);
154 }
155 LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
156 + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
157 + exceptionInfo);
158 }
159 }