1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
#ifndef _broker_Link_h
#define _broker_Link_h
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Link.h"
#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class TimerTask;
}
namespace broker {
class LinkRegistry;
class Broker;
class Connection;
class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
mutable sys::Mutex lock;
const std::string name;
LinkRegistry* links;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
const std::string configuredHost;
const uint16_t configuredPort;
// these reflect the current address of remote - will change during failover
std::string host;
uint16_t port;
std::string transport;
bool durable;
std::string authMechanism;
std::string username;
std::string password;
mutable uint64_t persistenceId;
qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject;
Broker* broker;
int state;
uint32_t visitCount;
uint32_t currentInterval;
Url url; // URL can contain many addresses.
size_t reconnectNext; // Index for next re-connect attempt
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
Bridges cancellations; // Bridges pending cancellation
framing::ChannelId nextFreeChannel;
RangeSet<framing::ChannelId> freeChannels;
Connection* connection;
management::ManagementAgent* agent;
boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
static const int STATE_OPERATIONAL = 3;
static const int STATE_FAILED = 4;
static const int STATE_CLOSED = 5;
static const int STATE_CLOSING = 6; // Waiting for outstanding connect to complete first
static const uint32_t MAX_INTERVAL = 32;
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
typedef boost::function<void(Link*)> DestroyedListener;
Link(const std::string& name,
LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password,
Broker* broker,
management::Manageable* parent = 0,
bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
lifetime of the Link */
std::string getHost() const { return configuredHost; }
uint16_t getPort() const { return configuredPort; }
std::string getTransport() const { return configuredTransport; }
/** returns the current address of the remote, which may be different from the
configured transport/host/port due to failover. Returns true if connection is
active */
QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
framing::ChannelId nextChannel(); // allocate channel from link free pool
void returnChannel(framing::ChannelId); // return channel to link free pool
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
// Close the link.
QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
static const std::string ENCODED_IDENTIFIER;
static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
// manage the exchange owned by this link
static const std::string exchangeTypeName;
static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
/** create a name for a link (if none supplied by user config) */
static std::string createName(const std::string& transport,
const std::string& host,
uint16_t port);
/** The current connction for this link. Note returns 0 if the link is not
* presently connected.
*/
Connection* getConnection() { return connection; }
};
}
}
#endif /*!_broker_Link.cpp_h*/
|