diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 101 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Bridge.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 135 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.h | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 39 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 3 | ||||
-rwxr-xr-x | cpp/src/tests/federation.py | 5 |
10 files changed, 370 insertions, 87 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index a8e7b3c368..337992992f 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -20,29 +20,35 @@ */ #include "Bridge.h" #include "ConnectionState.h" +#include "LinkRegistry.h" #include "qpid/management/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" using qpid::framing::FieldTable; using qpid::framing::Uuid; +using qpid::framing::Buffer; +using qpid::management::ManagementAgent; namespace qpid { namespace broker { -Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l, +Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : - id(_id), args(_args), - mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest, - args.i_key, args.i_src_is_queue, args.i_src_is_local)), - listener(l), name(Uuid(true).str()) + link(_link), id(_id), args(_args), + mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_src_is_queue, args.i_src_is_local, + args.i_tag, args.i_excludes)), + listener(l), name(Uuid(true).str()), persistenceId(0) { - management::ManagementAgent::getAgent()->addObject(mgmtObject); + if (!args.i_durable) + management::ManagementAgent::getAgent()->addObject(mgmtObject); } Bridge::~Bridge() -{ +{ mgmtObject->resourceDestroy(); } @@ -65,8 +71,8 @@ void Bridge::create(ConnectionState& c) string queue = "bridge_queue_"; queue += Uuid(true).str(); FieldTable queueSettings; - if (args.i_id.size()) { - queueSettings.setString("qpid.trace.id", args.i_id); + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); } if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); @@ -89,6 +95,81 @@ void Bridge::cancel() peer->getSession().detach(name); } +void Bridge::destroy() +{ + listener(this); +} + +void Bridge::setPersistenceId(uint64_t id) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject, id); + } + persistenceId = id; +} + +const string& Bridge::getName() const +{ + return name; +} + +Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + string src; + string dest; + string key; + string id; + string excludes; + + buffer.getShortString(host); + port = buffer.getShort(); + bool durable(buffer.getOctet()); + buffer.getShortString(src); + buffer.getShortString(dest); + buffer.getShortString(key); + bool is_queue(buffer.getOctet()); + bool is_local(buffer.getOctet()); + buffer.getShortString(id); + buffer.getShortString(excludes); + + return links.declare(host, port, durable, src, dest, key, + is_queue, is_local, id, excludes).first; +} + +void Bridge::encode(Buffer& buffer) const +{ + buffer.putShortString(string("bridge")); + buffer.putShortString(link->getHost()); + buffer.putShort(link->getPort()); + buffer.putOctet(args.i_durable ? 1 : 0); + buffer.putShortString(args.i_src); + buffer.putShortString(args.i_dest); + buffer.putShortString(args.i_key); + buffer.putOctet(args.i_src_is_queue ? 1 : 0); + buffer.putOctet(args.i_src_is_local ? 1 : 0); + buffer.putShortString(args.i_tag); + buffer.putShortString(args.i_excludes); +} + +uint32_t Bridge::encodedSize() const +{ + return link->getHost().size() + 1 // short-string (host) + + 7 // short-string ("bridge") + + 2 // port + + 1 // durable + + args.i_src.size() + 1 + + args.i_dest.size() + 1 + + args.i_key.size() + 1 + + 1 // src_is_queue + + 1 // src_is_local + + args.i_tag.size() + 1 + + args.i_excludes.size() + 1; +} + management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const { return dynamic_pointer_cast<management::ManagementObject>(mgmtObject); @@ -98,7 +179,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man { if (methodId == management::Bridge::METHOD_CLOSE) { //notify that we are closed - listener(this); + destroy(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index 15efcc6482..594a0ef508 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -21,8 +21,10 @@ #ifndef _Bridge_ #define _Bridge_ +#include "PersistableConfig.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/Buffer.h" #include "qpid/management/Manageable.h" #include "qpid/management/ArgsLinkBridge.h" #include "qpid/management/Bridge.h" @@ -35,10 +37,12 @@ namespace broker { class ConnectionState; class Link; +class LinkRegistry; -class Bridge : public management::Manageable +class Bridge : public PersistableConfig, public management::Manageable { public: + typedef boost::shared_ptr<Bridge> shared_ptr; typedef boost::function<void(Bridge*)> CancellationListener; Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args); @@ -46,20 +50,32 @@ public: void create(ConnectionState& c); void cancel(); + void destroy(); + bool isDurable() { return args.i_durable; } management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args); + // PersistableConfig: + void setPersistenceId(uint64_t id) const; + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + const std::string& getName() const; + static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + private: std::auto_ptr<framing::ChannelHandler> channelHandler; std::auto_ptr<framing::AMQP_ServerProxy::Session> session; std::auto_ptr<framing::AMQP_ServerProxy> peer; + Link* link; framing::ChannelId id; management::ArgsLinkBridge args; management::Bridge::shared_ptr mgmtObject; CancellationListener listener; std::string name; + mutable uint64_t persistenceId; }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d80c13f12a..bc862214b7 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -329,7 +329,8 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED; std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable); + links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable, + hp.i_authMechanism, hp.i_username, hp.i_password); if (hp.i_durable && response.second) store->create(*response.first); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 83c9a2a62e..cd032495e2 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -37,14 +37,19 @@ using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; -Link::Link(LinkRegistry* _links, - string& _host, - uint16_t _port, - bool _useSsl, - bool _durable, - Broker* _broker, +Link::Link(LinkRegistry* _links, + MessageStore* _store, + string& _host, + uint16_t _port, + bool _useSsl, + bool _durable, + string& _authMechanism, + string& _username, + string& _password, + Broker* _broker, management::Manageable* parent) - : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), broker(_broker), state(0), access(boost::bind(&Link::established, this), boost::bind(&Link::closed, this, _1, _2), @@ -65,7 +70,7 @@ Link::Link(LinkRegistry* _links, agent->addObject(mgmtObject); } } - setState(STATE_WAITING); + setStateLH(STATE_WAITING); } Link::~Link () @@ -76,7 +81,7 @@ Link::~Link () mgmtObject->resourceDestroy (); } -void Link::setState (int newState) +void Link::setStateLH (int newState) { if (newState == state) return; @@ -93,13 +98,13 @@ void Link::setState (int newState) } } -void Link::startConnection () +void Link::startConnectionLH () { try { broker->connect (host, port, useSsl, 0, &access); - setState(STATE_CONNECTING); + setStateLH(STATE_CONNECTING); } catch(std::exception& e) { - setState(STATE_WAITING); + setStateLH(STATE_WAITING); mgmtObject->set_lastError (e.what()); } } @@ -109,7 +114,7 @@ void Link::established () Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); - setState(STATE_OPERATIONAL); + setStateLH(STATE_OPERATIONAL); currentInterval = 1; visitCount = 0; if (closing) @@ -124,8 +129,11 @@ void Link::closed (int, std::string text) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); connection.reset(); - created.transfer(created.end(), active.begin(), active.end(), active); - setState(STATE_WAITING); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) + created.push_back(*i); + active.clear(); + + setStateLH(STATE_WAITING); mgmtObject->set_lastError (text); if (closing) destroy(); @@ -133,25 +141,56 @@ void Link::closed (int, std::string text) void Link::destroy () { + Mutex::ScopedLock mutex(lock); + Bridges toDelete; + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); connection.reset(); + + // Move the bridges to be deleted into a local vector so there is no + // corruption of the iterator caused by bridge deletion. + for (Bridges::iterator i = active.begin(); i != active.end(); i++) + toDelete.push_back(*i); + active.clear(); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) + toDelete.push_back(*i); + created.clear(); + + // Now delete all bridges on this link. + for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) + (*i)->destroy(); + toDelete.clear(); + links->destroy (host, port); } -void Link::cancel(Bridge* bridge) +void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); - //need to take this out of the active map and add it to the cancelled map + created.push_back (bridge); + if (state == STATE_OPERATIONAL && connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::cancel(Bridge::shared_ptr bridge) +{ + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } + } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if (&(*i) == bridge) { - cancelled.transfer(cancelled.end(), i, active); + if ((*i).get() == bridge.get()) { + bridge->cancel(); + active.erase(i); break; } } - - if (connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::ioThreadProcessing() @@ -161,21 +200,17 @@ void Link::ioThreadProcessing() //process any pending creates if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { - i->create(*connection); - } - active.transfer(active.end(), created.begin(), created.end(), created); - } - if (!cancelled.empty()) { - //process any pending cancellations - for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { - i->cancel(); + active.push_back(*i); + (*i)->create(*connection); } - cancelled.clear(); + created.clear(); } } void Link::setConnection(Connection::shared_ptr c) { + Mutex::ScopedLock mutex(lock); + connection = c; connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -193,11 +228,18 @@ void Link::maintenanceVisit () currentInterval *= 2; if (currentInterval > MAX_INTERVAL) currentInterval = MAX_INTERVAL; - startConnection(); + startConnectionLH(); } } } +uint Link::nextChannel() +{ + Mutex::ScopedLock mutex(lock); + + return channelCounter++; +} + void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) @@ -217,13 +259,19 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { string host; uint16_t port; + string authMechanism; + string username; + string password; buffer.getShortString(host); port = buffer.getShort(); bool useSsl(buffer.getOctet()); bool durable(buffer.getOctet()); + buffer.getShortString(authMechanism); + buffer.getShortString(username); + buffer.getShortString(password); - return links.declare(host, port, useSsl, durable).first; + return links.declare(host, port, useSsl, durable, authMechanism, username, password).first; } void Link::encode(Buffer& buffer) const @@ -233,6 +281,9 @@ void Link::encode(Buffer& buffer) const buffer.putShort(port); buffer.putOctet(useSsl ? 1 : 0); buffer.putOctet(durable ? 1 : 0); + buffer.putShortString(authMechanism); + buffer.putShortString(username); + buffer.putShortString(password); } uint32_t Link::encodedSize() const @@ -241,7 +292,10 @@ uint32_t Link::encodedSize() const + 5 // short-string ("link") + 2 // port + 1 // useSsl - + 1; // durable + + 1 // durable + + authMechanism.size() + 1 + + username.size() + 1 + + password.size() + 1; } ManagementObject::shared_ptr Link::GetManagementObject (void) const @@ -251,8 +305,6 @@ ManagementObject::shared_ptr Link::GetManagementObject (void) const Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) { - Mutex::ScopedLock mutex(lock); - switch (op) { case management::Link::METHOD_CLOSE : @@ -269,11 +321,14 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args if (iargs.i_durable && !durable) return Manageable::STATUS_INVALID_PARAMETER; - created.push_back(new Bridge(this, channelCounter++, - boost::bind(&Link::cancel, this, _1), iargs)); + std::pair<Bridge::shared_ptr, bool> result = + links->declare (host, port, iargs.i_durable, iargs.i_src, + iargs.i_dest, iargs.i_key, iargs.i_src_is_queue, + iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes); + + if (result.second && iargs.i_durable) + store->create(*result.first); - if (state == STATE_OPERATIONAL && connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); return Manageable::STATUS_OK; } diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 838c3bf696..c4eca86c19 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -45,10 +45,14 @@ namespace qpid { private: sys::Mutex lock; LinkRegistry* links; - const string host; - const uint16_t port; - const bool useSsl; - const bool durable; + MessageStore* store; + string host; + uint16_t port; + bool useSsl; + bool durable; + string authMechanism; + string username; + string password; mutable uint64_t persistenceId; management::Link::shared_ptr mgmtObject; Broker* broker; @@ -58,10 +62,9 @@ namespace qpid { uint32_t currentInterval; bool closing; - typedef boost::ptr_vector<Bridge> Bridges; + typedef std::vector<Bridge::shared_ptr> Bridges; Bridges created; // Bridges pending creation Bridges active; // Bridges active - Bridges cancelled; // Bridges pending deletion uint channelCounter; boost::shared_ptr<Connection> connection; @@ -71,29 +74,37 @@ namespace qpid { static const uint32_t MAX_INTERVAL = 16; - void setState (int newState); - void startConnection(); // Start the IO Connection + void setStateLH (int newState); + void startConnectionLH(); // Start the IO Connection void established(); // Called when connection is created void closed(int, std::string); // Called when connection goes away void destroy(); // Called when mgmt deletes this link - void cancel(Bridge*); // Called by self-cancelling bridge void ioThreadProcessing(); // Called on connection's IO thread by request void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection public: typedef boost::shared_ptr<Link> shared_ptr; - Link(LinkRegistry* links, - string& host, - uint16_t port, - bool useSsl, - bool durable, - Broker* broker, + Link(LinkRegistry* links, + MessageStore* store, + string& host, + uint16_t port, + bool useSsl, + bool durable, + string& authMechanism, + string& username, + string& password, + Broker* broker, management::Manageable* parent = 0); virtual ~Link(); + std::string getHost() { return host; } + uint16_t getPort() { return port; } bool isDurable() { return durable; } void maintenanceVisit (); + uint nextChannel(); + void add(Bridge::shared_ptr); + void cancel(Bridge::shared_ptr); // PersistableConfig: void setPersistenceId(uint64_t id) const; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 6e20a3f7ce..be3c67077e 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -46,15 +46,21 @@ void LinkRegistry::Periodic::fire () void LinkRegistry::periodicMaintenance () { Mutex::ScopedLock locker(lock); + linksToDestroy.clear(); + bridgesToDestroy.clear(); for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, - uint16_t port, - bool useSsl, - bool durable) +pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, + uint16_t port, + bool useSsl, + bool durable, + string& authMechanism, + string& username, + string& password) + { Mutex::ScopedLock locker(lock); stringstream keystream; @@ -66,13 +72,64 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent)); + link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable, + authMechanism, username, password, + broker, parent)); links[key] = link; return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, + uint16_t port, + bool durable, + std::string& src, + std::string& dest, + std::string& key, + bool is_queue, + bool is_local, + std::string& tag, + std::string& excludes) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + { + management::ArgsLinkBridge args; + Bridge::shared_ptr bridge; + + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_src_is_queue = is_queue; + args.i_src_is_local = is_local; + args.i_tag = tag; + args.i_excludes = excludes; + + bridge = Bridge::shared_ptr + (new Bridge (l->second.get(), l->second->nextChannel(), + boost::bind(&LinkRegistry::destroy, this, + host, port, src, dest, key), args)); + bridges[bridgeKey] = bridge; + l->second->add(bridge); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); + } + return std::pair<Bridge::shared_ptr, bool>(b->second, false); +} + void LinkRegistry::destroy(const string& host, const uint16_t port) { Mutex::ScopedLock locker(lock); @@ -90,6 +147,34 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } +void LinkRegistry::destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return; + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + return; + + l->second->cancel(b->second); + if (b->second->isDurable()) + store->destroy(*(b->second)); + bridgesToDestroy[bridgeKey] = b->second; + bridges.erase(b); +} + void LinkRegistry::setStore (MessageStore* _store) { assert (store == 0 && _store != 0); diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 86d8c3d2f9..3c47954141 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -24,6 +24,7 @@ #include <map> #include "Link.h" +#include "Bridge.h" #include "MessageStore.h" #include "Timer.h" #include "qpid/sys/Mutex.h" @@ -47,8 +48,13 @@ namespace broker { }; typedef std::map<std::string, Link::shared_ptr> LinkMap; - LinkMap links; - LinkMap linksToDestroy; + typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; + + LinkMap links; + LinkMap linksToDestroy; + BridgeMap bridges; + BridgeMap bridgesToDestroy; + qpid::sys::Mutex lock; Broker* broker; Timer timer; @@ -59,11 +65,32 @@ namespace broker { public: LinkRegistry (Broker* _broker); - std::pair<Link::shared_ptr, bool> declare(std::string& host, - uint16_t port, - bool useSsl, - bool durable); + std::pair<Link::shared_ptr, bool> + declare(std::string& host, + uint16_t port, + bool useSsl, + bool durable, + std::string& authMechanism, + std::string& username, + std::string& password); + std::pair<Bridge::shared_ptr, bool> + declare(std::string& host, + uint16_t port, + bool durable, + std::string& src, + std::string& dest, + std::string& key, + bool is_queue, + bool is_local, + std::string& id, + std::string& excludes); + void destroy(const std::string& host, const uint16_t port); + void destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key); /** * Register the manageable parent for declared queues diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 0f8c29f3b9..7fc2b6c4f3 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -23,6 +23,7 @@ #include "Message.h" #include "Queue.h" #include "Link.h" +#include "Bridge.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" #include "qpid/framing/reply_exceptions.h" @@ -85,10 +86,11 @@ public: class RecoverableConfigImpl : public RecoverableConfig { - // TODO: Add links for other config types, consider using super class (PersistableConfig?) - Link::shared_ptr link; + Link::shared_ptr link; + Bridge::shared_ptr bridge; public: - RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} + RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {} void setPersistenceId(uint64_t id); }; @@ -140,10 +142,10 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer string kind; buffer.getShortString (kind); - if (kind == "link") - { + if (kind == "link") return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); - } + else if (kind == "bridge") + return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead } @@ -212,7 +214,8 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id) { if (link.get()) link->setPersistenceId(id); - // TODO: add calls to other types. Consider using a parent class. + else if (bridge.get()) + bridge->setPersistenceId(id); } void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index e59a79f711..2ec0988fc0 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -65,6 +65,9 @@ void SessionHandler::handleIn(AMQFrame& f) { } else if (session.get()) { //we are attached and frame was not a session control so it is for upper layers session->handle(f); + } else if (m && m->isA<SessionDetachedBody>()) { + handleDetach(); + connection.closeChannel(channel.get()); } else { throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 33da19b1b8..98e34be0e9 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -171,7 +171,8 @@ class FederationTests(TestBase010): mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) link = mgmt.get_object("link") - mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1}) + mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", + "key":"", "tag":"", "excludes":"", "src_is_queue":1}) sleep(6) bridge = mgmt.get_object("bridge") @@ -210,7 +211,7 @@ class FederationTests(TestBase010): link = mgmt.get_object("link") mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", - "id":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"}) + "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"}) sleep(6) bridge = mgmt.get_object("bridge") |