View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import org.apache.hadoop.hbase.testclassification.SmallTests;
22  import org.junit.Before;
23  import org.junit.Test;
24  import org.junit.experimental.categories.Category;
25  
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import static org.junit.Assert.*;
33  
34  
35  @Category(SmallTests.class)
36  public class TestStealJobQueue {
37  
38    StealJobQueue<Integer> stealJobQueue;
39    BlockingQueue stealFromQueue;
40  
41    @Before
42    public void setup() {
43      stealJobQueue = new StealJobQueue<>();
44      stealFromQueue = stealJobQueue.getStealFromQueue();
45  
46    }
47  
48  
49    @Test
50    public void testTake() throws InterruptedException {
51      stealJobQueue.offer(3);
52      stealFromQueue.offer(10);
53      stealJobQueue.offer(15);
54      stealJobQueue.offer(4);
55      assertEquals(3, stealJobQueue.take().intValue());
56      assertEquals(4, stealJobQueue.take().intValue());
57      assertEquals("always take from the main queue before trying to steal", 15,
58              stealJobQueue.take().intValue());
59      assertEquals(10, stealJobQueue.take().intValue());
60      assertTrue(stealFromQueue.isEmpty());
61      assertTrue(stealJobQueue.isEmpty());
62    }
63  
64    @Test
65    public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException {
66      final AtomicInteger taken = new AtomicInteger();
67      Thread consumer = new Thread() {
68        @Override
69        public void run() {
70          try {
71            Integer n = stealJobQueue.take();
72            taken.set(n);
73          } catch (InterruptedException e) {
74            e.printStackTrace();
75          }
76        }
77      };
78      consumer.start();
79      stealFromQueue.offer(3);
80      consumer.join(1000);
81      assertEquals(3, taken.get());
82      consumer.interrupt(); //Ensure the consumer thread will stop.
83    }
84  
85  
86    @Test
87    public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException {
88      final AtomicInteger taken = new AtomicInteger();
89      Thread consumer = new Thread() {
90        @Override
91        public void run() {
92          try {
93            Integer n = stealJobQueue.take();
94            taken.set(n);
95          } catch (InterruptedException e) {
96            e.printStackTrace();
97          }
98        }
99      };
100     consumer.start();
101     stealJobQueue.offer(3);
102     consumer.join(1000);
103     assertEquals(3, taken.get());
104     consumer.interrupt(); //Ensure the consumer thread will stop.
105   }
106 
107 
108   @Test
109   public void testPoll() throws InterruptedException {
110     stealJobQueue.offer(3);
111     stealFromQueue.offer(10);
112     stealJobQueue.offer(15);
113     stealJobQueue.offer(4);
114     assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
115     assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
116     assertEquals("always take from the main queue before trying to steal", 15,
117             stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
118     assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
119     assertTrue(stealFromQueue.isEmpty());
120     assertTrue(stealJobQueue.isEmpty());
121     assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS));
122   }
123 
124   @Test
125   public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException {
126     final AtomicInteger taken = new AtomicInteger();
127     Thread consumer = new Thread() {
128       @Override
129       public void run() {
130         try {
131           Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
132           taken.set(n);
133         } catch (InterruptedException e) {
134           e.printStackTrace();
135         }
136       }
137     };
138     consumer.start();
139     stealFromQueue.put(3);
140     consumer.join(1000);
141     assertEquals(3, taken.get());
142     consumer.interrupt(); //Ensure the consumer thread will stop.
143 
144   }
145 
146 
147   @Test
148   public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException {
149     final AtomicInteger taken = new AtomicInteger();
150     Thread consumer = new Thread() {
151       @Override
152       public void run() {
153         try {
154           Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
155           taken.set(n);
156         } catch (InterruptedException e) {
157           e.printStackTrace();
158         }
159       }
160     };
161     consumer.start();
162     stealJobQueue.add(3);
163     consumer.join(1000);
164     assertEquals(3, taken.get());
165     consumer.interrupt(); //Ensure the consumer thread will stop.
166   }
167 
168 
169   @Test
170   public void testInteractWithThreadPool() throws InterruptedException {
171     StealJobQueue<Runnable> stealTasksQueue = new StealJobQueue<>();
172     final CountDownLatch stealJobCountDown = new CountDownLatch(3);
173     final CountDownLatch stealFromCountDown = new CountDownLatch(3);
174     ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {
175       @Override
176       protected void afterExecute(Runnable r, Throwable t) {
177         super.afterExecute(r, t);
178         stealJobCountDown.countDown();
179       }
180 
181     };
182 
183     //This is necessary otherwise no worker will be running and stealing job
184     stealPool.prestartAllCoreThreads();
185 
186     ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS,
187             stealTasksQueue.getStealFromQueue()) {
188       @Override
189       protected void afterExecute(Runnable r, Throwable t) {
190         super.afterExecute(r, t);
191         stealFromCountDown.countDown();
192       }
193     };
194 
195     for (int i = 0; i < 4; i++) {
196       TestTask task = new TestTask();
197       stealFromPool.execute(task);
198     }
199 
200     for (int i = 0; i < 2; i++) {
201       TestTask task = new TestTask();
202       stealPool.execute(task);
203     }
204 
205     stealJobCountDown.await(1, TimeUnit.SECONDS);
206     stealFromCountDown.await(1, TimeUnit.SECONDS);
207     assertEquals(0, stealFromCountDown.getCount());
208     assertEquals(0, stealJobCountDown.getCount());
209   }
210 
211   class TestTask extends Thread implements Comparable<TestTask> {
212     @Override
213     public int compareTo(TestTask o) {
214       return 0;
215     }
216 
217     @Override
218     public void run() {
219       try {
220         Thread.sleep(200);
221       } catch (InterruptedException e) {
222         e.printStackTrace();
223       }
224     }
225   }
226 
227 }