summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:04:53 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:04:53 +0000
commitbd2bcfb2c986f8cae5de240ef19fbb900b6ffd70 (patch)
treea0c8bcc0dfdc0546ebfe26aaae78dd70f1539dd0
parentdb74d09d2c7e1d7756a931d531941aa0aaf37455 (diff)
downloadqpid-python-bd2bcfb2c986f8cae5de240ef19fbb900b6ffd70.tar.gz
QPID-3603: In progress - integrate ReplicatingSubscription.
The code to use ReplicatingSubscription is there but it is disabled by commenting out getConsumerFactories().add in Backup.cpp because it hangs the test. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245480 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h11
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp35
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py3
7 files changed, 54 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 55c52cc508..d3a2e71ff6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "Settings.h"
#include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -58,6 +59,12 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
link = result.first;
boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
broker.getExchanges().registerExchange(wr);
+
+ // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the tests
+ // The tests pass with a plain subscription if we dont add the factory.
+// broker.getConsumerFactories().add(
+// boost::shared_ptr<ReplicatingSubscription::Factory>(
+// new ReplicatingSubscription::Factory()));
}
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index d4edd273dc..38ae19a11e 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -58,7 +58,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
- QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl
+ QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
<< ", broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
}
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 23f501328d..798dbc2bfd 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -56,10 +56,10 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
- QPID_LOG(info, "HA plugin enabled");
+ QPID_LOG(info, "HA: Enabled");
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
- QPID_LOG(info, "HA plugin disabled");
+ QPID_LOG(info, "HA: Disabled");
}
};
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 873d3784a7..1f9a8730b3 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -66,6 +66,23 @@ std::string mask(const std::string& in)
return DOLLAR + in + INTERNAL;
}
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* _parent,
+ const std::string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _acquire,
+ bool _exclusive,
+ const std::string& _tag,
+ const std::string& _resumeId,
+ uint64_t _resumeTtl,
+ const framing::FieldTable& _arguments
+) {
+ return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+ new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments));
+}
+
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* _parent,
const std::string& _name,
@@ -81,7 +98,8 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(_name))),
consumer(new DelegatingConsumer(*this))
{
-
+ // FIXME aconway 2011-11-25: string constants.
+ QPID_LOG(debug, "HA: replicating subscription " << _name << " to " << _queue->getName());
if (_arguments.isSet("qpid.high_sequence_number")) {
qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
qpid::framing::SequenceNumber lwm;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index a1c10a7641..0ba2b2f0de 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -24,6 +24,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
namespace qpid {
@@ -43,11 +44,21 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
public broker::QueueObserver
{
public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
+
~ReplicatingSubscription();
void init();
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 8b10765492..04d0f9d9ee 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -117,22 +117,18 @@ const string S_ALL="all";
ReplicateLevel replicateLevel(const string& str) {
// FIXME aconway 2011-11-24: case insenstive comparison.
- QPID_LOG(critical, "FIXME replicateLevel " << str);
ReplicateLevel rl = RL_NONE;
if (str == S_WIRING) rl = RL_WIRING;
else if (str == S_ALL) rl = RL_ALL;
- QPID_LOG(critical, "FIXME replicateLevel " << str << " = " << rl);
return rl;
}
ReplicateLevel replicateLevel(const framing::FieldTable& f) {
- QPID_LOG(critical, "FIXME replicateLevel " << f);
if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
else return RL_NONE;
}
ReplicateLevel replicateLevel(const Variant::Map& m) {
- QPID_LOG(critical, "FIXME replicateLevel " << m);
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end()) return replicateLevel(i->second.asString());
else return RL_NONE;
@@ -234,6 +230,7 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& map = list.front().asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
+ QPID_LOG(trace, "HA: Configuration event from primary: " << values);
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -249,29 +246,30 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ QPID_LOG(trace, "HA: Configuration response from primary: " << values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
- else throw Exception(QPID_MSG("Ignoring unexpected class: " << type));
+ else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
}
} else {
- QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message with headers: " << *headers));
+ QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
}
} catch (const std::exception& e) {
- QPID_LOG(warning, "Replicator: Error replicating configuration: " << e.what());
- QPID_LOG(debug, "Replicator: Error processing: " << list);
+ QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+ QPID_LOG(debug, "HA: Error processing configuration message: " << list);
}
}
void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doEventQueueDeclare " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
- QPID_LOG(debug, "HA: Creating replicated queue " << name);
- framing::FieldTable args;
+ framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+
+ QPID_LOG(debug, "HA: Creating queue from event " << name);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
@@ -288,7 +286,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
// out of date.
startQueueReplicator(result.first);
} else {
- QPID_LOG(warning, "Replicated queue " << name << " already exists");
+ QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
}
}
}
@@ -297,7 +295,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "Deleting replicated queue " << name);
+ QPID_LOG(debug, "HA: Deleting queue from event: " << name);
broker.deleteQueue(
name,
values[USER].asString(),
@@ -311,6 +309,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: Creating exchange from event " << name);
if (!broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -331,7 +330,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "Deleting replicated exchange " << name);
+ QPID_LOG(debug, "HA: Deleting exchange:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -358,10 +357,8 @@ void WiringReplicator::doEventBind(Variant::Map& values) {
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
- QPID_LOG(critical, "FIXME doResponseQueue " << values);
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(values[ARGUMENTS].asMap());
- QPID_LOG(critical, "FIXME doResponseQueue replevel " << replicateLevel(argsMap));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -429,10 +426,8 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
try {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- QPID_LOG(critical, "FIXME doResponseBind " << qName << " to " << exName);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- QPID_LOG(critical, "FIXME doResponseBind ptrs " << queue.get() << " to " << exchange.get());
// FIXME aconway 2011-11-24: more flexible configuration for binding replication.
// Automatically replicate exchange if queue and exchange are replicated
@@ -451,9 +446,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- QPID_LOG(critical, "FIXME startQueueReplicator " << queue->getName() << " " << queue->getSettings());
if (replicateLevel(queue->getSettings()) == RL_ALL) {
- QPID_LOG(critical, "FIXME startQueueReplicator starting");
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
broker.getExchanges().registerExchange(qr);
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2b52d202ca..79123058b7 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -43,9 +43,6 @@ class ShortTests(BrokerTest):
cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
self.assertEqual(0, os.system(cmd))
- def setup_replication(self, primary, backup, queue):
- self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s qpid.replicator-%s %s"%(backup, primary, queue, queue)))
-
# FIXME aconway 2011-11-15: work around async replication.
def wait(self, session, address):
def check():