diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:00 +0000 |
commit | 0010e075f124b72498419b9431bfac5c83c5eb82 (patch) | |
tree | 396d3deadcd68e7013102ebd92cc666aa165a788 | |
parent | e6d18651468ebbd37bc07786ba22b11960f8df1c (diff) | |
download | qpid-python-0010e075f124b72498419b9431bfac5c83c5eb82.tar.gz |
QPID-3603: Fix replication of dequeues.
- Set acquire=false when creating a ReplicatingSubscription.
- Cleaned up string literals & other cosmetic improvemets.
- Consistent find/get for broker::QueueRegistry and ExchangeRegistry.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 37 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 73 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 29 |
6 files changed, 90 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 2de9ec5a59..1f582adda7 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -35,12 +35,16 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); +const std::string TYPE_NAME("qpid.queue-replicator"); } namespace qpid { namespace ha { using namespace broker; +// FIXME aconway 2011-12-02: separate file for string constantS? +const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); + QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? queue(q), link(l), current(queue->getPosition()) @@ -72,9 +76,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; - // FIXME aconway 2011-11-28: string constants. settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - // FIXME aconway 2011-11-28: inconsistent use of _ vs. - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) @@ -86,15 +88,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest); } - -namespace { -const std::string DEQUEUE_EVENT("dequeue-event"); -const std::string REPLICATOR("qpid.replicator-"); -} - void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) { - if (key == DEQUEUE_EVENT) { + if (key == DEQUEUE_EVENT_KEY) { std::string content; msg.getMessage().getFrames().getContent(content); qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size()); @@ -115,6 +111,7 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message)); } else { // FIXME aconway 2011-11-29: error handling + // Is this an error? Will happen if queue has initial dequeues. QPID_LOG(error, "HA: Unable to dequeue message at " << QueuePos(queue.get(), *i)); } @@ -136,10 +133,6 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; } - -// FIXME aconway 2011-11-28: rationalise string constants. -static const std::string TYPE_NAME("qpid.queue-replicator"); - std::string QueueReplicator::getType() const { return TYPE_NAME; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 8085c11b82..02acf34886 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -49,6 +49,8 @@ namespace ha { class QueueReplicator : public broker::Exchange { public: + static const std::string DEQUEUE_EVENT_KEY; + QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); std::string getType() const; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 891f02dbe0..65b1ee65e8 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -36,11 +36,13 @@ using namespace std; // FIXME aconway 2011-11-28: review all arugment names, prefixes etc. // Do we want a common HA prefix? const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); -const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number"); -const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number"); +const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); +const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); +namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); +} // namespace class ReplicationStateInitialiser { @@ -80,15 +82,23 @@ ReplicatingSubscription::Factory::create( const string& name, Queue::shared_ptr queue, bool ack, - bool acquire, + bool /*acquire*/, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) { - return boost::shared_ptr<broker::SemanticState::ConsumerImpl>( - new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + boost::shared_ptr<ReplicatingSubscription> rs; + if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + // FIXME aconway 2011-12-01: ignoring acquire param and setting acquire + // false. Should this be done in the caller? Remove from ctor parameters. + rs.reset(new ReplicatingSubscription( + parent, name, queue, ack, false, exclusive, tag, + resumeId, resumeTtl, arguments)); + queue->addObserver(rs); + } + return rs; } ReplicatingSubscription::ReplicatingSubscription( @@ -108,12 +118,11 @@ ReplicatingSubscription::ReplicatingSubscription( consumer(new DelegatingConsumer(*this)) { QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); - // FIXME aconway 2011-11-25: string constants. - if (arguments.isSet("qpid.high_sequence_number")) { - qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number"); + if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) { + qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER); qpid::framing::SequenceNumber lwm; - if (arguments.isSet("qpid.low_sequence_number")) { - lwm = arguments.getAsInt("qpid.low_sequence_number"); + if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) { + lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER); } else { lwm = hwm; } @@ -159,6 +168,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m) m.payload->getIngressCompletion().startCompleter(); } +// Called with lock held. void ReplicatingSubscription::generateDequeueEvent() { string buf(range.encodedSize(),'\0'); @@ -186,11 +196,14 @@ void ReplicatingSubscription::generateDequeueEvent() event->getFrames().append(content); DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); - props->setRoutingKey("dequeue-event"); - + props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY); events->deliver(event); } +// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this? +// If a queue is drained with no new messages coming on +// will the messages be dequeued on the backup? + //called after the message has been removed from the deque and under //the message lock in the queue void ReplicatingSubscription::dequeued(const QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 2859536b6f..2236aeffcd 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -22,6 +22,7 @@ * */ +#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" #include "qpid/broker/ConsumerFactory.h" diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 125e2c0ba6..73effd9a7a 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -109,7 +109,6 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -// FIXME aconway 2011-11-24: this should be a class. enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; const string S_NONE="none"; const string S_WIRING="wiring"; @@ -216,6 +215,7 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH QPID_LOG(debug, "HA: Activated wiring replicator") } +// FIXME aconway 2011-12-02: error handling in route. Be forging but log warnings? void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; try { @@ -329,7 +329,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); try { - boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { QPID_LOG(debug, "HA: Deleting exchange:" << name); broker.deleteExchange( @@ -341,20 +341,23 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { } void WiringReplicator::doEventBind(Variant::Map& values) { - try { - boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString()); - boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString()); - // We only replicated a binds for a replicated queue to replicated exchange. - if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) { - framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); - string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - exchange->bind(queue, key, &args); - } - } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange. + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate binds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } } void WiringReplicator::doResponseQueue(Variant::Map& values) { @@ -424,26 +427,24 @@ const std::string QUEUE_REF("queueRef"); } // namespace void WiringReplicator::doResponseBind(Variant::Map& values) { - try { - std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); - std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName); - boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. - - // Automatically replicate exchange if queue and exchange are replicated - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) - { - framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - exchange->bind(queue, key, &args); - } - } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); + boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate binding if queue and exchange exist and are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } } void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 9b52c2fca7..51ef786c44 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -62,7 +62,11 @@ class ShortTests(BrokerTest): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) def setup(p, prefix): """Create config, send messages on the primary p""" - p.sender(queue(prefix+"q1", "all")).send(Message("1")) + s = p.sender(queue(prefix+"q1", "all")) + for m in ["a", "b", "1"]: s.send(Message(m)) + # Test replication of dequeue + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") + p.acknowledge() p.sender(queue(prefix+"q2", "wiring")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) @@ -70,13 +74,18 @@ class ShortTests(BrokerTest): # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. p.sender(queue(prefix+"x", "wiring")) - def verify(b, prefix): + def verify(b, prefix, p): """Verify setup was replicated to backup b""" + # FIXME aconway 2011-11-21: wait for wiring to replicate. self.wait(b, prefix+"x"); - # Verify backup # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. - self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) + self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) + + # FIXME aconway 2011-12-02: + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") + p.acknowledge() + self.assert_browse_retry(b, prefix+"q2", []) # wiring only self.assert_missing(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all @@ -84,18 +93,18 @@ class ShortTests(BrokerTest): b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) - # Create config, send messages before starting the backup, to test catch-up replication. primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() + # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1") - # Start the backup backup = self.ha_broker(name="backup", broker_url=primary.host_port()) - b = backup.connect().session() - verify(b, "1") - # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2") - verify(b, "2") + + # Verify the data on the backup + b = backup.connect().session() + verify(b, "1", p) + verify(b, "2", p) if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |