diff options
author | Ted Ross <tross@apache.org> | 2009-07-02 18:09:12 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-07-02 18:09:12 +0000 |
commit | f907a67e437482ab37db092f19962a53a0502819 (patch) | |
tree | 4b8042c0ee00db6297c8b1d09b2b4556bbf31d0b /cpp/src | |
parent | 577c1e9b2e12483223efe95cd83eb0f57e9f3448 (diff) | |
download | qpid-python-f907a67e437482ab37db092f19962a53a0502819.tar.gz |
Federation: Propagation of dynamic bindings is now done on the thread servicing the
federation link (connection).
Also, some minor cleanup of unneeded recursive includes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@790698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Bridge.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 9 |
6 files changed, 30 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index e629a20e87..d5a36d2bc8 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -21,11 +21,11 @@ #include "Bridge.h" #include "ConnectionState.h" #include "Connection.h" +#include "Link.h" #include "LinkRegistry.h" #include "SessionState.h" #include "qpid/management/ManagementAgent.h" -#include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <iostream> @@ -84,6 +84,7 @@ Bridge::~Bridge() void Bridge::create(Connection& c) { connState = &c; + conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); SessionHandler& sessionHandler = c.getChannel(id); @@ -288,7 +289,8 @@ void Bridge::propagateBinding(const string& key, const string& tagList, else bindArgs.setString(qpidFedOrigin, origin); - peer->getExchange().bind(queueName, args.i_src, key, bindArgs); + conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, + queueName, args.i_src, key, bindArgs)); } } @@ -299,7 +301,13 @@ void Bridge::sendReorigin() bindArgs.setString(qpidFedOp, fedOpReorigin); bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); - peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs); + conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, + queueName, args.i_src, args.i_key, bindArgs)); +} + +void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args) +{ + peer->getExchange().bind(queue, exchange, key, args); } bool Bridge::containsLocalTag(const string& tagList) const diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index dae28ddeaa..9578c60f4a 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -26,6 +26,7 @@ #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "Exchange.h" #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h" @@ -74,6 +75,7 @@ public: // Exchange::DynamicBridge methods void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin); void sendReorigin(); + void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args); bool containsLocalTag(const std::string& tagList) const; const std::string& getLocalTag() const; @@ -98,6 +100,7 @@ private: std::string queueName; mutable uint64_t persistenceId; ConnectionState* connState; + Connection* conn; }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index c4e8c4335c..a54bcc6db9 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -21,6 +21,7 @@ #include "Connection.h" #include "SessionState.h" #include "Bridge.h" +#include "Broker.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -268,13 +269,15 @@ void Connection::closed(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } bool Connection::doOutput() { - try{ + try { { - ScopedLock<Mutex> l(ioCallbackLock); - while (!ioCallbacks.empty()) { - ioCallbacks.front()(); // Lend the IO thread for management processing - ioCallbacks.pop(); - } + ScopedLock<Mutex> l(ioCallbackLock); + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing + } } if (mgmtClosing) close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 540f64a8ed..17bc8f0970 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -29,7 +29,6 @@ #include <boost/ptr_container/ptr_map.hpp> -#include "Broker.h" #include "ConnectionHandler.h" #include "ConnectionState.h" #include "SessionHandler.h" @@ -58,6 +57,7 @@ namespace qpid { namespace broker { +class Broker; class LinkRegistry; class SecureConnection; struct ConnectionTimeoutTask; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 7d4ab7548e..c6e10f0f8c 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -19,6 +19,7 @@ * */ #include "LinkRegistry.h" +#include "Link.h" #include "Connection.h" #include "qpid/log/Statement.h" #include <iostream> diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 2397dbc6f3..07fff5b979 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -23,17 +23,18 @@ */ #include <map> -#include "Link.h" #include "Bridge.h" #include "MessageStore.h" #include "Timer.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { + class Link; class Broker; class Connection; class LinkRegistry { @@ -49,7 +50,7 @@ namespace broker { void fire(); }; - typedef std::map<std::string, Link::shared_ptr> LinkMap; + typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; typedef std::map<std::string, TcpAddress> AddressMap; @@ -70,12 +71,12 @@ namespace broker { void periodicMaintenance (); bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); - Link::shared_ptr findLink(const std::string& key); + boost::shared_ptr<Link> findLink(const std::string& key); static std::string createKey(const TcpAddress& address); public: LinkRegistry (Broker* _broker); - std::pair<Link::shared_ptr, bool> + std::pair<boost::shared_ptr<Link>, bool> declare(std::string& host, uint16_t port, std::string& transport, |