1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23
24 import java.util.List;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.ServerName;
28 import org.apache.hadoop.hbase.testclassification.SmallTests;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
31 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
32 import org.apache.hadoop.hbase.replication.ReplicationPeers;
33 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.junit.experimental.categories.Category;
37
38 import com.google.common.collect.Lists;
39
40 @Category(SmallTests.class)
41 public class TestReplicationSinkManager {
42
43 private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
44
45 private ReplicationPeers replicationPeers;
46 private HBaseReplicationEndpoint replicationEndpoint;
47 private ReplicationSinkManager sinkManager;
48
49 @Before
50 public void setUp() {
51 replicationPeers = mock(ReplicationPeers.class);
52 replicationEndpoint = mock(HBaseReplicationEndpoint.class);
53 sinkManager = new ReplicationSinkManager(mock(HConnection.class),
54 PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
55 }
56
57 @Test
58 public void testChooseSinks() {
59 List<ServerName> serverNames = Lists.newArrayList();
60 for (int i = 0; i < 20; i++) {
61 serverNames.add(mock(ServerName.class));
62 }
63
64 when(replicationEndpoint.getRegionServers())
65 .thenReturn(serverNames);
66
67 sinkManager.chooseSinks();
68
69 assertEquals(2, sinkManager.getNumSinks());
70
71 }
72
73 @Test
74 public void testChooseSinks_LessThanRatioAvailable() {
75 List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
76 mock(ServerName.class));
77
78 when(replicationEndpoint.getRegionServers())
79 .thenReturn(serverNames);
80
81 sinkManager.chooseSinks();
82
83 assertEquals(1, sinkManager.getNumSinks());
84 }
85
86 @Test
87 public void testReportBadSink() {
88 ServerName serverNameA = mock(ServerName.class);
89 ServerName serverNameB = mock(ServerName.class);
90 when(replicationEndpoint.getRegionServers())
91 .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
92
93 sinkManager.chooseSinks();
94
95 assertEquals(1, sinkManager.getNumSinks());
96
97 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
98
99 sinkManager.reportBadSink(sinkPeer);
100
101
102 assertEquals(1, sinkManager.getNumSinks());
103
104 }
105
106
107
108
109
110 @Test
111 public void testReportBadSink_PastThreshold() {
112 List<ServerName> serverNames = Lists.newArrayList();
113 for (int i = 0; i < 30; i++) {
114 serverNames.add(mock(ServerName.class));
115 }
116 when(replicationEndpoint.getRegionServers())
117 .thenReturn(serverNames);
118
119
120 sinkManager.chooseSinks();
121
122 assertEquals(3, sinkManager.getNumSinks());
123
124 ServerName serverName = sinkManager.getSinksForTesting().get(0);
125
126 SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
127
128 sinkManager.reportSinkSuccess(sinkPeer);
129 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
130 sinkManager.reportBadSink(sinkPeer);
131 }
132
133
134
135 assertEquals(2, sinkManager.getNumSinks());
136
137
138
139
140 serverName = sinkManager.getSinksForTesting().get(0);
141
142 sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
143 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
144 sinkManager.reportBadSink(sinkPeer);
145 }
146 sinkManager.reportSinkSuccess(sinkPeer);
147 sinkManager.reportBadSink(sinkPeer);
148
149
150 assertEquals(2, sinkManager.getNumSinks());
151
152 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
153 sinkManager.reportBadSink(sinkPeer);
154 }
155
156 assertEquals(2, sinkManager.getNumSinks());
157
158 sinkManager.reportBadSink(sinkPeer);
159
160 assertEquals(1, sinkManager.getNumSinks());
161 }
162
163 @Test
164 public void testReportBadSink_DownToZeroSinks() {
165 List<ServerName> serverNames = Lists.newArrayList();
166 for (int i = 0; i < 20; i++) {
167 serverNames.add(mock(ServerName.class));
168 }
169 when(replicationEndpoint.getRegionServers())
170 .thenReturn(serverNames);
171
172
173 sinkManager.chooseSinks();
174
175
176 List<ServerName> sinkList = sinkManager.getSinksForTesting();
177 assertEquals(2, sinkList.size());
178
179 ServerName serverNameA = sinkList.get(0);
180 ServerName serverNameB = sinkList.get(1);
181
182 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
183 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
184
185 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
186 sinkManager.reportBadSink(sinkPeerA);
187 sinkManager.reportBadSink(sinkPeerB);
188 }
189
190
191
192 assertEquals(2, sinkManager.getNumSinks());
193 }
194
195 }