View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.assertEquals;
24  
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ExecutorCompletionService;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.locks.Lock;
35  import java.util.concurrent.locks.ReentrantReadWriteLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.testclassification.MediumTests;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  
43  @Category({MediumTests.class})
44  // Medium as it creates 100 threads; seems better to run it isolated
45  public class TestIdReadWriteLock {
46  
47    private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class);
48  
49    private static final int NUM_IDS = 16;
50    private static final int NUM_THREADS = 128;
51    private static final int NUM_SECONDS = 15;
52  
53    private IdReadWriteLock idLock = new IdReadWriteLock();
54  
55    private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
56  
57    private class IdLockTestThread implements Callable<Boolean> {
58  
59      private String clientId;
60  
61      public IdLockTestThread(String clientId) {
62        this.clientId = clientId;
63      }
64  
65      @Override
66      public Boolean call() throws Exception {
67        Thread.currentThread().setName(clientId);
68        Random rand = new Random();
69        long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
70        while (System.currentTimeMillis() < endTime) {
71          long id = rand.nextInt(NUM_IDS);
72          boolean readLock = rand.nextBoolean();
73  
74          ReentrantReadWriteLock readWriteLock = idLock.getLock(id);
75          Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock();
76          try {
77            lock.lock();
78            int sleepMs = 1 + rand.nextInt(4);
79            String owner = idOwner.get(id);
80            if (owner != null && LOG.isDebugEnabled()) {
81              LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " already taken by "
82                  + owner + ", we are " + clientId);
83            }
84  
85            idOwner.put(id, clientId);
86            Thread.sleep(sleepMs);
87            idOwner.remove(id);
88  
89          } finally {
90            lock.unlock();
91            if (LOG.isDebugEnabled()) {
92              LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of Id" + id + ", we are "
93                  + clientId);
94            }
95          }
96        }
97        return true;
98      }
99  
100   }
101 
102   @Test(timeout = 60000)
103   public void testMultipleClients() throws Exception {
104     ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
105     try {
106       ExecutorCompletionService<Boolean> ecs =
107           new ExecutorCompletionService<Boolean>(exec);
108       for (int i = 0; i < NUM_THREADS; ++i)
109         ecs.submit(new IdLockTestThread("client_" + i));
110       for (int i = 0; i < NUM_THREADS; ++i) {
111         Future<Boolean> result = ecs.take();
112         assertTrue(result.get());
113       }
114       // make sure the entry pool will be cleared after GC and purge call
115       int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
116       LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
117       assertEquals(0, entryPoolSize);
118     } finally {
119       exec.shutdown();
120       exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
121     }
122   }
123 
124 
125 }
126