summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.h')
-rw-r--r--cpp/src/qpid/broker/Link.h47
1 files changed, 35 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index a97fa48664..f0cb90e73b 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
@@ -52,8 +51,8 @@ class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
mutable sys::Mutex lock;
+ const std::string name;
LinkRegistry* links;
- MessageStore* store;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
@@ -64,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable {
uint16_t port;
std::string transport;
- bool durable;
+ bool durable;
+
std::string authMechanism;
std::string username;
std::string password;
@@ -85,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
+ boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
@@ -101,33 +103,39 @@ class Link : public PersistableConfig, public management::Manageable {
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
+ void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void reconnectLH(const Address&); //called by LinkRegistry
- void established(Connection*); // Called when connection is create
+ // connection management (called by LinkRegistry)
+ void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
+ void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
+ bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
+ typedef boost::function<void(Link*)> DestroyedListener;
- Link(LinkRegistry* links,
- MessageStore* store,
+ Link(const std::string& name,
+ LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
+ DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password,
Broker* broker,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0,
+ bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
@@ -139,7 +147,7 @@ class Link : public PersistableConfig, public management::Manageable {
/** returns the current address of the remote, which may be different from the
configured transport/host/port due to failover. Returns true if connection is
active */
- bool getRemoteAddress(qpid::Address& addr) const;
+ QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
@@ -148,15 +156,17 @@ class Link : public PersistableConfig, public management::Manageable {
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
- QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+ // Close the link.
+ QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
- void notifyConnectionForced(const std::string text);
void setPassive(bool p);
+ bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
@@ -165,7 +175,10 @@ class Link : public PersistableConfig, public management::Manageable {
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
@@ -178,6 +191,16 @@ class Link : public PersistableConfig, public management::Manageable {
// replicate internal state of this Link for clustering
void getState(framing::FieldTable& state) const;
void setState(const framing::FieldTable& state);
+
+ /** create a name for a link (if none supplied by user config) */
+ static std::string createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port);
+
+ /** The current connction for this link. Note returns 0 if the link is not
+ * presently connected.
+ */
+ Connection* getConnection() { return connection; }
};
}
}