summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:05:00 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:05:00 +0000
commit0010e075f124b72498419b9431bfac5c83c5eb82 (patch)
tree396d3deadcd68e7013102ebd92cc666aa165a788
parente6d18651468ebbd37bc07786ba22b11960f8df1c (diff)
downloadqpid-python-0010e075f124b72498419b9431bfac5c83c5eb82.tar.gz
QPID-3603: Fix replication of dequeues.
- Set acquire=false when creating a ReplicatingSubscription. - Cleaned up string literals & other cosmetic improvemets. - Consistent find/get for broker::QueueRegistry and ExchangeRegistry. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp37
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h1
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp73
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py29
6 files changed, 90 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 2de9ec5a59..1f582adda7 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -35,12 +35,16 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
+const std::string TYPE_NAME("qpid.queue-replicator");
}
namespace qpid {
namespace ha {
using namespace broker;
+// FIXME aconway 2011-12-02: separate file for string constantS?
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
queue(q), link(l), current(queue->getPosition())
@@ -72,9 +76,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
- // FIXME aconway 2011-11-28: string constants.
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest))
@@ -86,15 +88,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
}
-
-namespace {
-const std::string DEQUEUE_EVENT("dequeue-event");
-const std::string REPLICATOR("qpid.replicator-");
-}
-
void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
{
- if (key == DEQUEUE_EVENT) {
+ if (key == DEQUEUE_EVENT_KEY) {
std::string content;
msg.getMessage().getFrames().getContent(content);
qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
@@ -115,6 +111,7 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
} else {
// FIXME aconway 2011-11-29: error handling
+ // Is this an error? Will happen if queue has initial dequeues.
QPID_LOG(error, "HA: Unable to dequeue message at "
<< QueuePos(queue.get(), *i));
}
@@ -136,10 +133,6 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-// FIXME aconway 2011-11-28: rationalise string constants.
-static const std::string TYPE_NAME("qpid.queue-replicator");
-
std::string QueueReplicator::getType() const { return TYPE_NAME; }
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 8085c11b82..02acf34886 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -49,6 +49,8 @@ namespace ha {
class QueueReplicator : public broker::Exchange
{
public:
+ static const std::string DEQUEUE_EVENT_KEY;
+
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
std::string getType() const;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 891f02dbe0..65b1ee65e8 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -36,11 +36,13 @@ using namespace std;
// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
// Do we want a common HA prefix?
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
+namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
+} // namespace
class ReplicationStateInitialiser
{
@@ -80,15 +82,23 @@ ReplicatingSubscription::Factory::create(
const string& name,
Queue::shared_ptr queue,
bool ack,
- bool acquire,
+ bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
) {
- return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
- new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ boost::shared_ptr<ReplicatingSubscription> rs;
+ if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+ // FIXME aconway 2011-12-01: ignoring acquire param and setting acquire
+ // false. Should this be done in the caller? Remove from ctor parameters.
+ rs.reset(new ReplicatingSubscription(
+ parent, name, queue, ack, false, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ queue->addObserver(rs);
+ }
+ return rs;
}
ReplicatingSubscription::ReplicatingSubscription(
@@ -108,12 +118,11 @@ ReplicatingSubscription::ReplicatingSubscription(
consumer(new DelegatingConsumer(*this))
{
QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
- // FIXME aconway 2011-11-25: string constants.
- if (arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
+ if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
+ qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
qpid::framing::SequenceNumber lwm;
- if (arguments.isSet("qpid.low_sequence_number")) {
- lwm = arguments.getAsInt("qpid.low_sequence_number");
+ if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) {
+ lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER);
} else {
lwm = hwm;
}
@@ -159,6 +168,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m)
m.payload->getIngressCompletion().startCompleter();
}
+// Called with lock held.
void ReplicatingSubscription::generateDequeueEvent()
{
string buf(range.encodedSize(),'\0');
@@ -186,11 +196,14 @@ void ReplicatingSubscription::generateDequeueEvent()
event->getFrames().append(content);
DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey("dequeue-event");
-
+ props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY);
events->deliver(event);
}
+// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this?
+// If a queue is drained with no new messages coming on
+// will the messages be dequeued on the backup?
+
//called after the message has been removed from the deque and under
//the message lock in the queue
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 2859536b6f..2236aeffcd 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -22,6 +22,7 @@
*
*/
+#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 125e2c0ba6..73effd9a7a 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -109,7 +109,6 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-// FIXME aconway 2011-11-24: this should be a class.
enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
const string S_NONE="none";
const string S_WIRING="wiring";
@@ -216,6 +215,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
QPID_LOG(debug, "HA: Activated wiring replicator")
}
+// FIXME aconway 2011-12-02: error handling in route. Be forging but log warnings?
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
Variant::List list;
try {
@@ -329,7 +329,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (exchange && replicateLevel(exchange->getArgs())) {
QPID_LOG(debug, "HA: Deleting exchange:" << name);
broker.deleteExchange(
@@ -341,20 +341,23 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
}
void WiringReplicator::doEventBind(Variant::Map& values) {
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
- boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
- // We only replicated a binds for a replicated queue to replicated exchange.
- if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
- framing::FieldTable args;
- amqp_0_10::translate(values[ARGS].asMap(), args);
- string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
- << " queue=" << queue->getName()
- << " key=" << key);
- exchange->bind(queue, key, &args);
- }
- } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange.
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate binds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
}
void WiringReplicator::doResponseQueue(Variant::Map& values) {
@@ -424,26 +427,24 @@ const std::string QUEUE_REF("queueRef");
} // namespace
void WiringReplicator::doResponseBind(Variant::Map& values) {
- try {
- std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
- std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
-
- // Automatically replicate exchange if queue and exchange are replicated
- if (exchange && replicateLevel(exchange->getArgs()) &&
- queue && replicateLevel(queue->getSettings()))
- {
- framing::FieldTable args;
- amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
- << " queue=" << queue->getName()
- << " key=" << key);
- exchange->bind(queue, key, &args);
- }
- } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate binding if queue and exchange exist and are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 9b52c2fca7..51ef786c44 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -62,7 +62,11 @@ class ShortTests(BrokerTest):
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
def setup(p, prefix):
"""Create config, send messages on the primary p"""
- p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+ s = p.sender(queue(prefix+"q1", "all"))
+ for m in ["a", "b", "1"]: s.send(Message(m))
+ # Test replication of dequeue
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
+ p.acknowledge()
p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
@@ -70,13 +74,18 @@ class ShortTests(BrokerTest):
# FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "wiring"))
- def verify(b, prefix):
+ def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
+
# FIXME aconway 2011-11-21: wait for wiring to replicate.
self.wait(b, prefix+"x");
- # Verify backup
# FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
- self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+ self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
+
+ # FIXME aconway 2011-12-02:
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
+ p.acknowledge()
+
self.assert_browse_retry(b, prefix+"q2", []) # wiring only
self.assert_missing(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
@@ -84,18 +93,18 @@ class ShortTests(BrokerTest):
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
- # Create config, send messages before starting the backup, to test catch-up replication.
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
+ # Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1")
- # Start the backup
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
- b = backup.connect().session()
- verify(b, "1")
-
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2")
- verify(b, "2")
+
+ # Verify the data on the backup
+ b = backup.connect().session()
+ verify(b, "1", p)
+ verify(b, "2", p)
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)