summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp48
1 files changed, 33 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index 04a76b9758..1b3286792f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -22,7 +22,8 @@
#include "Core.h"
#include "WiringHandler.h"
#include "EventHandler.h"
-#include "BrokerHandler.h"
+#include "QueueHandler.h"
+#include "BrokerContext.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
@@ -32,18 +33,20 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace cluster {
using namespace broker;
using framing::FieldTable;
-WiringHandler::WiringHandler(EventHandler& e) :
+WiringHandler::WiringHandler(EventHandler& e,
+ const boost::intrusive_ptr<QueueHandler>& qh) :
HandlerBase(e),
broker(e.getCore().getBroker()),
recovery(broker.getQueues(), broker.getExchanges(),
- broker.getLinks(), broker.getDtxManager())
+ broker.getLinks(), broker.getDtxManager()),
+ queueHandler(qh)
{}
bool WiringHandler::invoke(const framing::AMQBody& body) {
@@ -51,24 +54,39 @@ bool WiringHandler::invoke(const framing::AMQBody& body) {
}
void WiringHandler::createQueue(const std::string& data) {
- if (sender() == self()) return;
- BrokerHandler::ScopedSuppressReplication ssr;
- framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
- // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
- RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
- QPID_LOG(debug, "cluster: create queue " << queue->getName());
+ // FIXME aconway 2011-05-25: Needs async completion.
+ std::string name;
+ if (sender() != self()) { // Created by another member, need to create locally.
+ BrokerContext::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf);
+ name = rq->getName();
+ }
+ else { // Created locally, Queue and QueueContext already exist.
+ framing::Buffer buffer(const_cast<char*>(&data[0]), data.size());
+ // FIXME aconway 2011-05-10: implicit knowledge of queue encoding.
+ buffer.getShortString(name);
+ }
+ boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
+ assert(q); // FIXME aconway 2011-05-10: error handling.
+ // TODO aconway 2011-05-10: if we implement multi-group for queues then
+ // this call is a problem: comes from wiring delivery thread, not queues.
+ // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers..
+ queueHandler->add(q);
+ QPID_LOG(debug, "cluster: create queue " << q->getName());
}
void WiringHandler::destroyQueue(const std::string& name) {
if (sender() == self()) return;
QPID_LOG(debug, "cluster: destroy queue " << name);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.deleteQueue(name, std::string(), std::string());
}
void WiringHandler::createExchange(const std::string& data) {
if (sender() == self()) return;
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
// TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
@@ -78,7 +96,7 @@ void WiringHandler::createExchange(const std::string& data) {
void WiringHandler::destroyExchange(const std::string& name) {
if (sender() == self()) return;
QPID_LOG(debug, "cluster: destroy exchange " << name);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.getExchanges().destroy(name);
}
@@ -91,7 +109,7 @@ void WiringHandler::bind(
<< " exchange=" << exchangeName
<< " key=" << routingKey
<< " arguments=" << arguments);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
}
@@ -104,7 +122,7 @@ void WiringHandler::unbind(
<< " exchange=" << exchangeName
<< " key=" << routingKey
<< " arguments=" << arguments);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
}