summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp14
-rw-r--r--cpp/src/qpid/broker/Bridge.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp15
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp1
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h9
6 files changed, 30 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index e629a20e87..d5a36d2bc8 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -21,11 +21,11 @@
#include "Bridge.h"
#include "ConnectionState.h"
#include "Connection.h"
+#include "Link.h"
#include "LinkRegistry.h"
#include "SessionState.h"
#include "qpid/management/ManagementAgent.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <iostream>
@@ -84,6 +84,7 @@ Bridge::~Bridge()
void Bridge::create(Connection& c)
{
connState = &c;
+ conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(id);
@@ -288,7 +289,8 @@ void Bridge::propagateBinding(const string& key, const string& tagList,
else
bindArgs.setString(qpidFedOrigin, origin);
- peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, key, bindArgs));
}
}
@@ -299,7 +301,13 @@ void Bridge::sendReorigin()
bindArgs.setString(qpidFedOp, fedOpReorigin);
bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
- peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, args.i_key, bindArgs));
+}
+
+void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args)
+{
+ peer->getExchange().bind(queue, exchange, key, args);
}
bool Bridge::containsLocalTag(const string& tagList) const
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index dae28ddeaa..9578c60f4a 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -26,6 +26,7 @@
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "Exchange.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
@@ -74,6 +75,7 @@ public:
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
void sendReorigin();
+ void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args);
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
@@ -98,6 +100,7 @@ private:
std::string queueName;
mutable uint64_t persistenceId;
ConnectionState* connState;
+ Connection* conn;
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index c4e8c4335c..a54bcc6db9 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -21,6 +21,7 @@
#include "Connection.h"
#include "SessionState.h"
#include "Bridge.h"
+#include "Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -268,13 +269,15 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
bool Connection::doOutput() {
- try{
+ try {
{
- ScopedLock<Mutex> l(ioCallbackLock);
- while (!ioCallbacks.empty()) {
- ioCallbacks.front()(); // Lend the IO thread for management processing
- ioCallbacks.pop();
- }
+ ScopedLock<Mutex> l(ioCallbackLock);
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
+ }
}
if (mgmtClosing)
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 540f64a8ed..17bc8f0970 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -29,7 +29,6 @@
#include <boost/ptr_container/ptr_map.hpp>
-#include "Broker.h"
#include "ConnectionHandler.h"
#include "ConnectionState.h"
#include "SessionHandler.h"
@@ -58,6 +57,7 @@
namespace qpid {
namespace broker {
+class Broker;
class LinkRegistry;
class SecureConnection;
struct ConnectionTimeoutTask;
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 7d4ab7548e..c6e10f0f8c 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -19,6 +19,7 @@
*
*/
#include "LinkRegistry.h"
+#include "Link.h"
#include "Connection.h"
#include "qpid/log/Statement.h"
#include <iostream>
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 2397dbc6f3..07fff5b979 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -23,17 +23,18 @@
*/
#include <map>
-#include "Link.h"
#include "Bridge.h"
#include "MessageStore.h"
#include "Timer.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+ class Link;
class Broker;
class Connection;
class LinkRegistry {
@@ -49,7 +50,7 @@ namespace broker {
void fire();
};
- typedef std::map<std::string, Link::shared_ptr> LinkMap;
+ typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
typedef std::map<std::string, TcpAddress> AddressMap;
@@ -70,12 +71,12 @@ namespace broker {
void periodicMaintenance ();
bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
- Link::shared_ptr findLink(const std::string& key);
+ boost::shared_ptr<Link> findLink(const std::string& key);
static std::string createKey(const TcpAddress& address);
public:
LinkRegistry (Broker* _broker);
- std::pair<Link::shared_ptr, bool>
+ std::pair<boost::shared_ptr<Link>, bool>
declare(std::string& host,
uint16_t port,
std::string& transport,