summaryrefslogtreecommitdiff
path: root/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
blob: db667879f60a3e1a677b0ca8e2e5370e38adb1f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.apache.qpid.server.cluster;

import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.replay.ReplayManager;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.InvokeMultiple;
import org.apache.qpid.framing.AMQMethodBody;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * Manages the membership list of a group and the set of brokers representing the
 * remote peers. The group should be initialised through a call to establish()
 * or connectToLeader().
 *
 */
class BrokerGroup
{
    private static final Logger _logger = Logger.getLogger(BrokerGroup.class);

    private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class);
    private final ReplayManager _replayMgr;
    private final MemberHandle _local;
    private final BrokerFactory _factory;
    private final Object _lock = new Object();
    private final Set<MemberHandle> _synch = new HashSet<MemberHandle>();
    private List<MemberHandle> _members;
    private List<Broker> _peers = new ArrayList<Broker>();
    private JoinState _state = JoinState.UNINITIALISED;

    /**
     * Creates an unitialised group.
     *
     * @param local a handle that represents the local broker
     * @param replayMgr the replay manager to use when creating new brokers
     * @param factory the factory through which broker instances are created
     */
    BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory)
    {
        _replayMgr = replayMgr;
        _local = local;
        _factory = factory;
    }

    /**
     * Called to establish the local broker as the leader of a new group
     */
    void establish()
    {
        synchronized (_lock)
        {
            setState(JoinState.JOINED);
            _members = new ArrayList<MemberHandle>();
            _members.add(_local);
        }
        fireChange();
    }

    /**
     * Called by prospect to connect to group
     */
    Broker connectToLeader(MemberHandle handle) throws Exception
    {
        Broker leader = _factory.create(handle);
        leader = leader.connectToCluster();
        synchronized (_lock)
        {
            setState(JoinState.JOINING);
            _members = new ArrayList<MemberHandle>();
            _members.add(leader);
            _peers.add(leader);
        }
        fireChange();
        return leader;
    }

    /**
     * Called by leader when handling a join request
     */
    Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException
    {
        Broker prospect = _factory.create(handle);
        prospect.connect();
        synchronized (_lock)
        {
            _members.add(prospect);
            _peers.add(prospect);
        }
        fireChange();
        return prospect;
    }

    /**
     * Called in reponse to membership announcements.
     *
     * @param members the list of members now part of the group
     */
    void setMembers(List<MemberHandle> members)
    {
        if (isJoined())
        {
            List<Broker> old = _peers;

            synchronized (_lock)
            {
                _peers = getBrokers(members);
                _members = new ArrayList<MemberHandle>(members);
            }

            //remove those that are still members
            old.removeAll(_peers);

            //handle failure of any brokers that haven't survived
            for (Broker peer : old)
            {
                peer.remove();
            }
        }
        else
        {
            synchronized (_lock)
            {
                setState(JoinState.INITIATION);
                _members = new ArrayList<MemberHandle>(members);
                _synch.addAll(_members);
                _synch.remove(_local);
            }
        }
        fireChange();
    }

    List<MemberHandle> getMembers()
    {
        synchronized (_lock)
        {
            return Collections.unmodifiableList(_members);
        }
    }

    List<Broker> getPeers()
    {
        synchronized (_lock)
        {
            return _peers;
        }
    }

    /**
     * Removes the member presented from the group
     * @param peer the broker that should be removed
     */
    void remove(Broker peer)
    {
        synchronized (_lock)
        {
            _peers.remove(peer);
            _members.remove(peer);
        }
        fireChange();
    }

    MemberHandle getLocal()
    {
        return _local;
    }

    Broker getLeader()
    {
        synchronized (_lock)
        {
            return _peers.size() > 0 ? _peers.get(0) : null;
        }
    }

    /**
     * Allows a Broker instance to be retrieved for a given handle
     *
     * @param handle the handle for which a broker is sought
     * @param create flag to indicate whther a broker should be created for the handle if
     * one is not found within the list of known peers
     * @return the broker corresponding to handle or null if a match cannot be found and
     * create is false
     */
    Broker findBroker(MemberHandle handle, boolean create)
    {
        if (handle instanceof Broker)
        {
            return (Broker) handle;
        }
        else
        {
            for (Broker b : getPeers())
            {
                if (b.matches(handle))
                {
                    return b;
                }
            }
        }
        if (create)
        {
            Broker b = _factory.create(handle);
            List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
            _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
            b.connectAsynch(msgs);

            return b;
        }
        else
        {
            return null;
        }
    }

    /**
     * @param member the member to test for leadership
     * @return true if the passed in member is the group leader, false otherwise
     */
    boolean isLeader(MemberHandle member)
    {
        synchronized (_lock)
        {
            return member.matches(_members.get(0));
        }
    }

    /**
     * @return true if the local broker is the group leader, false otherwise
     */
    boolean isLeader()
    {
        return isLeader(_local);
    }

    /**
     * Used when the leader fails and the next broker in the list needs to
     * assume leadership
     * @return true if the action succeeds
     */
    boolean assumeLeadership()
    {
        boolean valid;
        synchronized (_lock)
        {
            valid = _members.size() > 1 && _local.matches(_members.get(1));
            if (valid)
            {
                _members.remove(0);
                _peers.remove(0);
            }
        }
        fireChange();
        return valid;
    }

    /**
     * Called in response to a Cluster.Synch message being received during the join
     * process. This indicates that the member mentioned has replayed all necessary
     * messages to the local broker.
     *
     * @param member the member from whom the synch messages was received
     */
    void synched(MemberHandle member)
    {
        _logger.info(new LogMessage("Synchronised with {0}", member));
        synchronized (_lock)
        {
            if (isLeader(member))
            {
                setState(JoinState.INDUCTION);
            }
            _synch.remove(member);
            if (_synch.isEmpty())
            {
                _peers = getBrokers(_members);
                setState(JoinState.JOINED);
            }
        }
    }


    /**
     * @return the state of the group
     */
    JoinState getState()
    {
        synchronized (_lock)
        {
            return _state;
        }
    }

    void addMemberhipChangeListener(MembershipChangeListener l)
    {
        _changeListeners.addListener(l);
    }

    void removeMemberhipChangeListener(MembershipChangeListener l)
    {
        _changeListeners.removeListener(l);
    }



    private void setState(JoinState state)
    {
        _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state));
        _state = state;
    }

    private boolean isJoined()
    {
        return inState(JoinState.JOINED);
    }

    private boolean inState(JoinState state)
    {
        return _state.equals(state);
    }

    private List<Broker> getBrokers(List<MemberHandle> handles)
    {
        List<Broker> brokers = new ArrayList<Broker>();
        for (MemberHandle handle : handles)
        {
            if (!_local.matches(handle))
            {
                brokers.add(findBroker(handle, true));
            }
        }
        return brokers;
    }

    private void fireChange()
    {
        List<MemberHandle> members;
        synchronized(this)
        {
            members = new ArrayList(_members);
        }
        _changeListeners.getProxy().changed(Collections.unmodifiableList(members));
    }
}