summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:03:52 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:03:52 +0000
commit7305cf39aff572bca77d89c12cc6d403e20ab20d (patch)
tree0ad560b8a400e0f5d7a6059f6f307f4cdc16a3bc
parent82dccc664ada89b73288c06eb7dc425d8359defc (diff)
downloadqpid-python-7305cf39aff572bca77d89c12cc6d403e20ab20d.tar.gz
QPID-3603: Automatic wiring and message replication.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233646 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp (renamed from qpid/cpp/src/qpid/broker/QueueReplicator.cpp)67
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h (renamed from qpid/cpp/src/qpid/broker/QueueReplicator.h)42
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp101
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.h2
-rw-r--r--qpid/cpp/src/tests/brokertest.py6
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py74
14 files changed, 201 insertions, 118 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 50d2c1eb86..ec8ff98a54 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -627,8 +627,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
- qpid/broker/QueueReplicator.h \
- qpid/broker/QueueReplicator.cpp \
qpid/broker/ReplicatingSubscription.h \
qpid/broker/ReplicatingSubscription.cpp \
qpid/broker/RateFlowcontrol.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index ca6415d8dd..3d465d235e 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -29,6 +29,8 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
qpid/ha/Settings.h \
+ qpid/ha/QueueReplicator.h \
+ qpid/ha/QueueReplicator.cpp \
qpid/ha/WiringReplicator.cpp \
qpid/ha/WiringReplicator.h
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index b3a742c170..02034811bf 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -25,7 +25,6 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/ha/WiringReplicator.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
@@ -112,10 +111,7 @@ void Bridge::create(Connection& c)
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
if (initialize) initialize(*this, sessionHandler);
else if (args.i_srcIsQueue) {
- //TODO: something other than this which is nasty...
- bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
-
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index b067da3702..acaa9d1cbd 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -105,6 +105,8 @@ namespace qpid {
std::string getHost() { return host; }
uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
bool isDurable() { return durable; }
void maintenanceVisit ();
uint nextChannel();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b34bc65ec5..eb6ab8ba15 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1434,7 +1434,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index b66600ef43..a53916ffbc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -403,7 +403,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index fbf9cd228e..333b707308 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -27,7 +27,6 @@
#include "qpid/broker/Message.h"
#include "qpid/ha/WiringReplicator.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/ReplicatingSubscription.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
@@ -478,9 +477,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
- cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 300621860b..55c52cc508 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -42,9 +42,10 @@ using types::Variant;
using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+ // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
Url url(s.brokerUrl);
- QPID_LOG(info, "HA backup broker connecting to: " << url);
+ QPID_LOG(info, "HA: Acting as backup to " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 01c0c8e272..f09b2acaaf 100644
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -18,18 +18,62 @@
* under the License.
*
*/
-#include "qpid/broker/QueueReplicator.h"
+
+#include "QueueReplicator.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
namespace qpid {
-namespace broker {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
+ queue(q), link(l), current(queue->getPosition())
+{
+ // FIXME aconway 2011-11-24: consistent logging.
+ QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+ );
+}
-QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
QueueReplicator::~QueueReplicator() {}
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, framing::FieldTable());
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+
+}
+
+
namespace {
const std::string DEQUEUE_EVENT("dequeue-event");
const std::string REPLICATOR("qpid.replicator-");
@@ -78,23 +122,6 @@ bool QueueReplicator::isReplicatingLink(const std::string& name)
return name.find(REPLICATOR) == 0;
}
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (!queue) {
- QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
- } else {
- //TODO: need to cache the replicator
- QPID_LOG(info, "Creating replicator for " << queueName);
- exchange.reset(new QueueReplicator(target, queue));
- }
- }
- return exchange;
-}
-
bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
{
if (isReplicatingLink(target)) {
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 679aa9240d..13fbc6e86c 100644
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
/*
*
@@ -25,33 +25,43 @@
#include "qpid/framing/SequenceSet.h"
namespace qpid {
-namespace broker {
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
/**
* Dummy exchange for processing replication messages
*/
-class QueueReplicator : public Exchange
+class QueueReplicator : public broker::Exchange
{
public:
- QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
+ QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
static bool isReplicatingLink(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
- static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
+ static bool initReplicationSettings(const std::string&, broker::QueueRegistry&, framing::FieldTable&);
static const std::string typeName;
private:
- boost::shared_ptr<Queue> queue;
- qpid::framing::SequenceNumber current;
- qpid::framing::SequenceSet dequeued;
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+ framing::SequenceNumber current;
+ framing::SequenceSet dequeued;
};
-}} // namespace qpid::broker
+}} // namespace qpid::ha
-#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index c89df85503..8b10765492 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
#include "WiringReplicator.h"
+#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
@@ -62,7 +63,6 @@ const string QUERY_RESPONSE("_query_response");
const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
-const string ALL("all");
const string ALTEX("altEx");
const string ARGS("args");
const string ARGUMENTS("arguments");
@@ -83,7 +83,6 @@ const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
-const string WIRING("wiring");
const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
@@ -110,15 +109,33 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-bool isReplicated(const string& value) {
- return value == ALL || value == WIRING;
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+ // FIXME aconway 2011-11-24: case insenstive comparison.
+ QPID_LOG(critical, "FIXME replicateLevel " << str);
+ ReplicateLevel rl = RL_NONE;
+ if (str == S_WIRING) rl = RL_WIRING;
+ else if (str == S_ALL) rl = RL_ALL;
+ QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
+ return rl;
}
-bool isReplicated(const framing::FieldTable& f) {
- return f.isSet(QPID_REPLICATE) && isReplicated(f.getAsString(QPID_REPLICATE));
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ QPID_LOG(critical, "FIXME replicateLevel " << f);
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
}
-bool isReplicated(const Variant::Map& m) {
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ QPID_LOG(critical, "FIXME replicateLevel " << m);
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- return i != m.end() && isReplicated(i->second.asString());
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
}
void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
@@ -164,6 +181,8 @@ WiringReplicator::~WiringReplicator() {}
WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
: Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
{
+ QPID_LOG(debug, "HA: Starting replication from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -198,6 +217,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "Activated wiring replicator")
}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
@@ -244,13 +264,15 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
- if (values[DISP] == CREATED && isReplicated(argsMap)) {
- QPID_LOG(debug, "Creating replicated queue " << name);
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ QPID_LOG(debug, "HA: Creating replicated queue " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- if (!broker.createQueue(
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
name,
values[DURABLE].asBool(),
values[AUTODEL].asBool(),
@@ -258,11 +280,14 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
values[ALTEX].asString(),
args,
values[USER].asString(),
- values[RHOST].asString()).second) {
+ values[RHOST].asString());
+ if (result.second) {
// FIXME aconway 2011-11-22: should delete old queue and
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
+ startQueueReplicator(result.first);
+ } else {
QPID_LOG(warning, "Replicated queue " << name << " already exists");
}
}
@@ -271,7 +296,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && isReplicated(queue->getSettings())) {
+ if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "Deleting replicated queue " << name);
broker.deleteQueue(
name,
@@ -282,7 +307,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(values[ARGS].asMap());
- if (values[DISP] == CREATED && isReplicated(argsMap)) {
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -305,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
- if (exchange && isReplicated(exchange->getArgs())) {
+ if (exchange && replicateLevel(exchange->getArgs())) {
QPID_LOG(debug, "Deleting replicated exchange " << name);
broker.deleteExchange(
name,
@@ -320,7 +345,7 @@ void WiringReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
// We only replicated a binds for a replicated queue to replicated exchange.
- if (isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) {
+ if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
framing::FieldTable args;
amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
@@ -333,21 +358,28 @@ void WiringReplicator::doEventBind(Variant::Map& values) {
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
+ QPID_LOG(critical, "FIXME doResponseQueue " << values);
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(values[ARGUMENTS].asMap());
- if (!isReplicated(argsMap)) return;
+ QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap));
+ if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << " (in catch-up)");
- if (!broker.createQueue(
- values[NAME].asString(),
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
values[DURABLE].asBool(),
values[AUTODELETE].asBool(),
0 /*i.e. no owner regardless of exclusivity on master*/,
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ startQueueReplicator(result.first);
+ } else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists (in catch-up)");
@@ -356,7 +388,7 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) {
void WiringReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(values[ARGUMENTS].asMap());
- if (!isReplicated(argsMap)) return;
+ if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() << " (in catch-up)");
@@ -396,23 +428,21 @@ const std::string QUEUE_REF("queueRef");
void WiringReplicator::doResponseBind(Variant::Map& values) {
try {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
- if (!exchange) return;
-
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " << exName);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- if (!queue) return;
+ QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to " << exchange.get());
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
- // We only replicated a bind for a replicated queue to replicated exchange.
- // FIXME aconway 2011-11-22: do we always log binds between replicated ex/q
- // or do we consider the bind arguments as well?
- if (exchange && queue &&
- isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings()))
+ // Automatically replicate exchange if queue and exchange are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
@@ -420,6 +450,15 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
} catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
}
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << " " << queue->getSettings());
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
+ QPID_LOG(critical, "FIXME startQueueReplicator starting");
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ }
+}
+
bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
index 55d6130cb8..6a5edb114c 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.h
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -68,7 +68,7 @@ class WiringReplicator : public broker::Exchange
void doResponseBind(types::Variant::Map& values);
private:
- void startQueueReplicator(const std::string& name);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index aa83989649..8dcde5a863 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -519,6 +519,12 @@ class BrokerTest(TestCase):
actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ def test(): return self.browse(session, queue, 0) == expect_contents
+ retry(test, timeout, delay)
+ self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index bd5b847510..2b52d202ca 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -57,50 +57,52 @@ class ShortTests(BrokerTest):
def assert_missing(self,session, address):
try:
- session.receiver(a)
+ session.receiver(address)
self.fail("Should not have been replicated: %s"%(address))
except NotFound: pass
- def test_replicate_wiring(self):
- queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"
- exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"
-
- # Create some wiring before starting the backup, to test catch-up
+ def test_replication(self):
+ def queue(name, replicate):
+ return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+
+ def exchange(name, replicate, bindq):
+ return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def setup(p, prefix):
+ """Create config, send messages on the primary p"""
+ p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+ p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+ p.sender(queue(prefix+"x", "wiring"))
+
+ def verify(b, prefix):
+ """Verify setup was replicated to backup b"""
+ # FIXME aconway 2011-11-21: wait for wiring to replicate.
+ self.wait(b, prefix+"x");
+ # Verify backup
+ # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+ self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+ self.assert_missing(b, prefix+"q3")
+ b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+ self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+
+ # Create config, send messages before starting the backup, to test catch-up replication.
primary = self.ha_broker(name="primary")
p = primary.connect().session()
- p.sender(queue%("q1", "all")).send(Message("1"))
- p.sender(queue%("q2", "wiring")).send(Message("2"))
- p.sender(queue%("q3", "none")).send(Message("3"))
- p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
-
- # Create some after starting backup, test steady-state replication
+ setup(p, "1")
+ # Start the backup
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
b = backup.connect().session()
- # FIXME aconway 2011-11-21: need to wait for backup to be ready to test event replication
- for a in ["q1", "q2", "e1"]: self.wait(b,a)
- p.sender(queue%("q11", "all")).send(Message("11"))
- p.sender(queue%("q12", "wiring")).send(Message("12"))
- p.sender(queue%("q13", "none")).send(Message("13"))
- p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14"))
-
- # Verify replication
- # FIXME aconway 2011-11-18: We should kill primary here and fail over.
- for a in ["q11", "q12", "e11"]: self.wait(b,a)
- # FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(b, "q11", ["11", "14", "e11"])
-# self.assert_browse(b, "q12", []) # wiring only
-# self.assert_missing(b,"q13")
- b.sender("e11").send(Message("e11")) # Verify bind
- self.assert_browse(b, "q12", ["e11"])
-
- for a in ["q1", "q2", "e1"]: self.wait(b,a)
- # FIXME aconway 2011-11-18: replicate messages
-# self.assert_browse(b, "q1", ["1", "4", "e1"])
-# self.assert_browse(b, "q2", []) # wiring only
-# self.assert_missing(b,"q3")
- b.sender("e1").send(Message("e1")) # Verify bind
- self.assert_browse(b, "q2", ["e1"])
+ verify(b, "1")
+ # Create config, send messages after starting the backup, to test steady-state replication.
+ setup(p, "2")
+ verify(b, "2")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)