summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-14 16:00:52 +0000
committerAlan Conway <aconway@apache.org>2012-02-14 16:00:52 +0000
commitb44b95ab1b1e0550d23f2b1d5c4a31680d1a8cf2 (patch)
tree874cb75ef75b06235f8ce25d41aeeda9c424a9e1
parent4f818b6166b6909f58f2203ee19d52c2e4ddf222 (diff)
downloadqpid-python-b44b95ab1b1e0550d23f2b1d5c4a31680d1a8cf2.tar.gz
QPID-3603: Move init code for WiringReplicator out of Bridge into ha::Backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244029 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp80
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h19
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h6
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp76
6 files changed, 114 insertions, 75 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index e4d173af9c..d053106309 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -59,9 +59,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << name;
@@ -77,9 +79,9 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -98,7 +100,7 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -108,7 +110,8 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
//TODO: something other than this which is nasty...
bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
@@ -116,52 +119,6 @@ void Bridge::create(Connection& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
- } else if (ha::WiringReplicator::isWiringReplicatorDestination(args.i_dest)) {
- //declare and bind an event queue
- peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
- peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
- //subscribe to the queue
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
- Variant::Map request;
- request["_what"] = "OBJECT";
- Variant::Map schema;
- schema["_class_name"] = (i == 0 ? "queue" : "exchange");
- schema["_package_name"] = "org.apache.qpid.broker";
- request["_schema_id"] = schema;
-
- AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId("qmf2");
- props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
- headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
- }
-
} else {
FieldTable queueSettings;
@@ -236,11 +193,6 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -268,7 +220,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
is_queue, is_local, id, excludes, dynamic, sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -285,8 +237,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -311,7 +263,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
management::Args& /*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -358,7 +310,7 @@ void Bridge::sendReorigin()
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
queueName, args.i_src, args.i_key, bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8b4559a871..b849b11ba8 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
+class SessionHandler;
class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
Bridge(Link* link, framing::ChannelId id, CancellationListener l,
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+ InitializeCallback init
+ );
~Bridge();
void create(Connection& c);
@@ -70,8 +74,8 @@ public:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const { return name; }
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
// Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@ public:
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
+ // Methods needed by initialization functions
+ std::string getQueueName() const { return queueName; }
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@ private:
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
+ InitializeCallback initialize;
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index cf6cb83968..31b4f1b490 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -162,7 +162,9 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
const std::string& tag,
const std::string& excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+ Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index f4bc9aab40..7e5b39f223 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -91,6 +91,7 @@ namespace broker {
const std::string& authMechanism,
const std::string& username,
const std::string& password);
+
std::pair<Bridge::shared_ptr, bool>
declare(const std::string& host,
uint16_t port,
@@ -103,9 +104,12 @@ namespace broker {
const std::string& id,
const std::string& excludes,
bool dynamic,
- uint16_t sync);
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
void destroy(const std::string& host, const uint16_t port);
+
void destroy(const std::string& host,
const uint16_t port,
const std::string& src,
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ca6d6bb193..8cd5072574 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 6b99ca74c3..ef581c1a04 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -21,11 +21,79 @@
#include "Backup.h"
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
namespace qpid {
namespace ha {
+using namespace framing;
+using namespace broker;
+using types::Variant;
+
+namespace {
+const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+}
+
+// Initialize a bridge as a wiring replicator.
+void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ std::string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
+ Variant::Map request;
+ request["_what"] = "OBJECT";
+ Variant::Map schema;
+ schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+ schema["_package_name"] = "org.apache.qpid.broker";
+ request["_schema_id"] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), "qmf.default.direct", 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId("qmf2");
+ props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+ }
+}
+
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
// Create a link to replicate wiring
if (s.brokerUrl != "dummy") {
@@ -41,15 +109,17 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
broker.getLinks().declare( // Declare the bridge
url[0].host, url[0].port,
false, // durable
- "qpid.wiring-replicator", // src
- "qpid.wiring-replicator", // dest
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
"x", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
- 0); // sync?
+ 0, // sync?
+ bridgeInitWiringReplicator
+ );
}
// FIXME aconway 2011-11-17: need to enhance the link code to
// handle discovery of the primary broker and fail-over correctly.