diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.h')
-rw-r--r-- | cpp/src/qpid/broker/Link.h | 47 |
1 files changed, 35 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index a97fa48664..f0cb90e73b 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -25,7 +25,6 @@ #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/BrokerImportExport.h" @@ -52,8 +51,8 @@ class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: mutable sys::Mutex lock; + const std::string name; LinkRegistry* links; - MessageStore* store; // these remain constant across failover - used to identify this link const std::string configuredTransport; @@ -64,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable { uint16_t port; std::string transport; - bool durable; + bool durable; + std::string authMechanism; std::string username; std::string password; @@ -85,8 +85,10 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; + boost::function<void(Link*)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange + bool failover; // Do we subscribe to a failover exchange? uint failoverChannel; std::string failoverSession; @@ -101,33 +103,39 @@ class Link : public PersistableConfig, public management::Manageable { void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void destroy(); // Called when mgmt deletes this link + void destroy(); // Cleanup connection before link goes away void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailoverLH(); // Called during maintenance visit bool hideManagement() const; + void reconnectLH(const Address&); //called by LinkRegistry - void established(Connection*); // Called when connection is create + // connection management (called by LinkRegistry) + void established(Connection*); // Called when connection is created void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away - void reconnectLH(const Address&); //called by LinkRegistry + void notifyConnectionForced(const std::string text); void closeConnection(const std::string& reason); + bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote? friend class LinkRegistry; // to call established, opened, closed public: typedef boost::shared_ptr<Link> shared_ptr; + typedef boost::function<void(Link*)> DestroyedListener; - Link(LinkRegistry* links, - MessageStore* store, + Link(const std::string& name, + LinkRegistry* links, const std::string& host, uint16_t port, const std::string& transport, + DestroyedListener l, bool durable, const std::string& authMechanism, const std::string& username, const std::string& password, Broker* broker, - management::Manageable* parent = 0); + management::Manageable* parent = 0, + bool failover=true); virtual ~Link(); /** these return the *configured* transport/host/port, which does not change over the @@ -139,7 +147,7 @@ class Link : public PersistableConfig, public management::Manageable { /** returns the current address of the remote, which may be different from the configured transport/host/port due to failover. Returns true if connection is active */ - bool getRemoteAddress(qpid::Address& addr) const; + QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const; bool isDurable() { return durable; } void maintenanceVisit (); @@ -148,15 +156,17 @@ class Link : public PersistableConfig, public management::Manageable { void cancel(Bridge::shared_ptr); QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. - QPID_BROKER_EXTERN void close(); // Close the link from within the broker. + + // Close the link. + QPID_BROKER_EXTERN void close(); std::string getAuthMechanism() { return authMechanism; } std::string getUsername() { return username; } std::string getPassword() { return password; } Broker* getBroker() { return broker; } - void notifyConnectionForced(const std::string text); void setPassive(bool p); + bool isConnecting() const { return state == STATE_CONNECTING; } // PersistableConfig: void setPersistenceId(uint64_t id) const; @@ -165,7 +175,10 @@ class Link : public PersistableConfig, public management::Manageable { void encode(framing::Buffer& buffer) const; const std::string& getName() const; + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedLink(const std::string& key); // Manageable entry points management::ManagementObject* GetManagementObject(void) const; @@ -178,6 +191,16 @@ class Link : public PersistableConfig, public management::Manageable { // replicate internal state of this Link for clustering void getState(framing::FieldTable& state) const; void setState(const framing::FieldTable& state); + + /** create a name for a link (if none supplied by user config) */ + static std::string createName(const std::string& transport, + const std::string& host, + uint16_t port); + + /** The current connction for this link. Note returns 0 if the link is not + * presently connected. + */ + Connection* getConnection() { return connection; } }; } } |