View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.Coprocessor;
29  import org.apache.hadoop.hbase.CoprocessorEnvironment;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
33  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumRequest;
34  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumResponse;
35  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36  import org.apache.hadoop.hbase.regionserver.InternalScanner;
37  import org.apache.hadoop.hbase.regionserver.Region;
38  import org.apache.hadoop.hbase.util.Bytes;
39  
40  import com.google.protobuf.RpcCallback;
41  import com.google.protobuf.RpcController;
42  import com.google.protobuf.Service;
43  
44  /**
45   * Test coprocessor endpoint that always returns {@code null} for requests to the last region
46   * in the table.  This allows tests to provide assurance of correct {@code null} handling for
47   * response values.
48   */
49  public class ColumnAggregationEndpointNullResponse
50      extends
51      ColumnAggregationServiceNullResponse
52  implements Coprocessor, CoprocessorService  {
53    private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
54    private RegionCoprocessorEnvironment env = null;
55    @Override
56    public Service getService() {
57      return this;
58    }
59  
60    @Override
61    public void start(CoprocessorEnvironment env) throws IOException {
62      if (env instanceof RegionCoprocessorEnvironment) {
63        this.env = (RegionCoprocessorEnvironment)env;
64        return;
65      }
66      throw new CoprocessorException("Must be loaded on a table region!");
67    }
68  
69    @Override
70    public void stop(CoprocessorEnvironment env) throws IOException {
71      // Nothing to do.
72    }
73  
74    @Override
75    public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
76      // aggregate at each region
77      Scan scan = new Scan();
78      // Family is required in pb. Qualifier is not.
79      byte[] family = request.getFamily().toByteArray();
80      byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
81      if (request.hasQualifier()) {
82        scan.addColumn(family, qualifier);
83      } else {
84        scan.addFamily(family);
85      }
86      int sumResult = 0;
87      InternalScanner scanner = null;
88      try {
89        Region region = this.env.getRegion();
90        // for the last region in the table, return null to test null handling
91        if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
92          done.run(null);
93          return;
94        }
95        scanner = region.getScanner(scan);
96        List<Cell> curVals = new ArrayList<Cell>();
97        boolean hasMore = false;
98        do {
99          curVals.clear();
100         hasMore = scanner.next(curVals);
101         for (Cell kv : curVals) {
102           if (CellUtil.matchingQualifier(kv, qualifier)) {
103             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
104           }
105         }
106       } while (hasMore);
107     } catch (IOException e) {
108       ResponseConverter.setControllerException(controller, e);
109       // Set result to -1 to indicate error.
110       sumResult = -1;
111       LOG.info("Setting sum result to -1 to indicate error", e);
112     } finally {
113       if (scanner != null) {
114         try {
115           scanner.close();
116         } catch (IOException e) {
117           ResponseConverter.setControllerException(controller, e);
118           sumResult = -1;
119           LOG.info("Setting sum result to -1 to indicate error", e);
120         }
121       }
122     }
123     done.run(SumResponse.newBuilder().setSum(sumResult).build());
124     LOG.info("Returning sum " + sumResult + " for region " +
125         Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName()));
126   }
127 }