summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.h
blob: f962f4c72ff4d0070f452fefe4f6f02e062e6e24 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#ifndef QPID_CLUSTER_CLUSTER_H
#define QPID_CLUSTER_CLUSTER_H

/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include "Cpg.h"
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
#include "ConnectionMap.h"
#include "FailoverExchange.h"
#include "Quorum.h"
#include "Multicaster.h"

#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/LockPtr.h"
#include "qpid/management/Manageable.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"

#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>

#include <algorithm>
#include <vector>
#include <map>

namespace qpid {

namespace framing {
class AMQBody;
class Uuid;
}

namespace cluster {

class Connection;

/**
 * Connection to the cluster
 *
 * Threading notes: 3 thread categories: connection, deliver, dump.
 * 
 */
class Cluster : private Cpg::Handler, public management::Manageable {
  public:
    typedef boost::intrusive_ptr<Connection> ConnectionPtr;
    typedef std::vector<ConnectionPtr> Connections;
    
    /**
     * Join a cluster. 
     */
    Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax);

    virtual ~Cluster();

    // Connection map - called in connection threads.
    void insert(const ConnectionPtr&); 
    void erase(ConnectionId);       
    
    // URLs of current cluster members - called in connection threads.
    std::vector<Url> getUrls() const;
    boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }

    // Leave the cluster - called in any thread.
    void leave();

    // Dump completed - called in dump thread
    void dumpInDone(const ClusterMap&);

    MemberId getId() const;
    broker::Broker& getBroker() const;
    Multicaster& getMulticast() { return mcast; }

    boost::function<bool ()> isQuorate;
    void checkQuorum();         // called in connection threads.

    size_t getReadMax() { return readMax; }
    
  private:
    typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
    typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
    typedef sys::Monitor::ScopedLock Lock;

    typedef sys::PollableQueue<Event> PollableEventQueue;
    typedef std::deque<Event> PlainEventQueue;

    // NB: The final Lock& parameter on functions below is used to mark functions
    // that should only be called by a function that already holds the lock.
    // The parameter makes it hard to forget since you have to have an instance of
    // a Lock to call the unlocked functions.

    void leave(Lock&);
    std::vector<Url> getUrls(Lock&) const;

    // Make an offer if we can - called in deliver thread.
    void tryMakeOffer(const MemberId&, Lock&);

    // Called in main thread in ~Broker.
    void brokerShutdown();

    // Cluster controls implement XML methods from cluster.xml.
    // Called in deliver thread.
    // 
    void dumpRequest(const MemberId&, const std::string&, Lock&);
    void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
    void ready(const MemberId&, const std::string&, Lock&);
    void configChange(const MemberId&, const std::string& addresses, Lock& l);
    void shutdown(const MemberId&, Lock&);
    void delivered(PollableEventQueue::Queue&); // deliverQueue callback
    void deliveredEvent(const Event&); 

    // Helper, called in deliver thread.
    void dumpStart(const MemberId& dumpee, const Url& url, Lock&);

    // CPG callbacks, called in CPG IO thread.
    void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
    void disconnect(sys::DispatchHandle&); // PG was disconnected

    void deliver( // CPG deliver callback. 
        cpg_handle_t /*handle*/,
        struct cpg_name *group,
        uint32_t /*nodeid*/,
        uint32_t /*pid*/,
        void* /*msg*/,
        int /*msg_len*/);

    void deliver(const Event& e, Lock&); 
    
    void configChange( // CPG config change callback.
        cpg_handle_t /*handle*/,
        struct cpg_name */*group*/,
        struct cpg_address */*members*/, int /*nMembers*/,
        struct cpg_address */*left*/, int /*nLeft*/,
        struct cpg_address */*joined*/, int /*nJoined*/
    );

    boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);

    virtual qpid::management::ManagementObject* GetManagementObject() const;
    virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);

    void stopClusterNode(Lock&);
    void stopFullCluster(Lock&);
    void memberUpdate(Lock&);

    // Called in connection IO threads .
    void checkDumpIn(Lock&);

    // Called in DumpClient thread.
    void dumpOutDone();
    void dumpOutError(const std::exception&);
    void dumpOutDone(Lock&);

    void setClusterId(const framing::Uuid&);

    // Immutable members set on construction, never changed.
    broker::Broker& broker;
    boost::shared_ptr<sys::Poller> poller;
    Cpg cpg;
    const std::string name;
    const Url myUrl;
    const MemberId myId;
    const size_t readMax;
    framing::Uuid clusterId;
    NoOpConnectionOutputHandler shadowOut;
    sys::DispatchHandle cpgDispatchHandle;


    // Thread safe members
    Multicaster mcast;
    qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
    PollableEventQueue deliverQueue;
    ConnectionMap connections;
    boost::shared_ptr<FailoverExchange> failoverExchange;
    Quorum quorum;

    // Remaining members are protected by lock.
    mutable sys::Monitor lock;

    //    Local cluster state, cluster map
    enum {
        INIT,                   ///< Initial state, no CPG messages received.
        NEWBIE,                 ///< Sent dump request, waiting for dump offer.
        DUMPEE,                 ///< Stalled receive queue at dump offer, waiting for dump to complete.
        CATCHUP,                ///< Dump complete, unstalled but has not yet seen own "ready" event.
        READY,                  ///< Fully operational 
        OFFER,                  ///< Sent an offer, waiting for accept/reject.
        DUMPER,                 ///< Offer accepted, sending a state dump.
        LEFT                    ///< Final state, left the cluster.
    } state;
    ClusterMap map;
    size_t lastSize;
    bool lastBroker;

    //     Dump related
    sys::Thread dumpThread;
    boost::optional<ClusterMap> dumpedMap;

  friend std::ostream& operator<<(std::ostream&, const Cluster&);
  friend class ClusterDispatcher;
};

}} // namespace qpid::cluster



#endif  /*!QPID_CLUSTER_CLUSTER_H*/