diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:05:13 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:05:13 +0000 |
commit | d563fd77c419b9518caebd8d7baf3c7865c1890c (patch) | |
tree | 3b639729a3b9644752e4db5c68a18ff5cb652fbb | |
parent | 6cdac3d8cae283df9c4a023c0c1e2e51250dbb02 (diff) | |
download | qpid-python-d563fd77c419b9518caebd8d7baf3c7865c1890c.tar.gz |
QPID-3603: Integrate ReplicatingSubscription into the HA code.
HaBroker registers the ConsumerFactory, QueueReplicator sets
appropriate arguments in consume command.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245482 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 2 |
12 files changed, 80 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index de2b09660c..775c4cd862 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -118,7 +118,7 @@ void SemanticState::consume(const string& tag, const ConsumerFactories::Factories& cf( session.getBroker().getConsumerFactories().get()); ConsumerImpl::shared_ptr c; - for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c) + for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i) c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments); if (!c) // Create plain consumer diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index d3a2e71ff6..39ddc527b0 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -43,8 +43,8 @@ using types::Variant; using std::string; Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { - // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address. - if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary. + // FIXME aconway 2011-11-24: identifying the primary. + if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); QPID_LOG(info, "HA: Acting as backup to " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; @@ -59,12 +59,6 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { link = result.first; boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); broker.getExchanges().registerExchange(wr); - - // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the tests - // The tests pass with a plain subscription if we dont add the factory. -// broker.getConsumerFactories().add( -// boost::shared_ptr<ReplicatingSubscription::Factory>( -// new ReplicatingSubscription::Factory())); } } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index cee626379b..b4183a4dba 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -37,6 +37,9 @@ class Settings; /** * State associated with a backup broker. Manages connections to primary. + * + * THREAD SAFE: trivially because currently it only has a constructor. + * May need locking as the functionality grows. */ class Backup { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 38ae19a11e..af82909ba3 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -21,6 +21,7 @@ #include "Backup.h" #include "HaBroker.h" #include "Settings.h" +#include "ReplicatingSubscription.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" @@ -61,6 +62,10 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl << ", broker-url=" << brokerUrl); backup.reset(new Backup(broker, s)); + // Register a factory for replicating subscriptions. + broker.getConsumerFactories().add( + boost::shared_ptr<ReplicatingSubscription::Factory>( + new ReplicatingSubscription::Factory())); } HaBroker::~HaBroker() {} diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index f09b2acaaf..bf9de4cc0c 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -20,6 +20,7 @@ */ #include "QueueReplicator.h" +#include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -63,14 +64,25 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L QueueReplicator::~QueueReplicator() {} +// NB: This is called back ina broker connection thread when the +// bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + // No lock needed, no mutable member variables are used. framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, framing::FieldTable()); + framing::FieldTable settings; + // FIXME aconway 2011-11-28: string constants. + settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // FIXME aconway 2011-11-28: inconsistent use of _ vs. - + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + qpid::framing::SequenceNumber oldest; + if (queue->getOldest(oldest)) + settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); + + peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); - } @@ -117,39 +129,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid } } -bool QueueReplicator::isReplicatingLink(const std::string& name) -{ - return name.find(REPLICATOR) == 0; -} - -bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings) -{ - if (isReplicatingLink(target)) { - std::string queueName = target.substr(REPLICATOR.size()); - boost::shared_ptr<Queue> queue = queues.find(queueName); - if (queue) { - settings.setInt("qpid.replicating-subscription", 1); - settings.setInt("qpid.high_sequence_number", queue->getPosition()); - qpid::framing::SequenceNumber oldest; - if (queue->getOldest(oldest)) { - settings.setInt("qpid.low_sequence_number", oldest); - } - } - return true; - } else { - return false; - } -} - bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; } -const std::string QueueReplicator::typeName("queue-replicator"); +// FIXME aconway 2011-11-28: rationalise string constants. +static const std::string TYPE_NAME("qpid.queue-replicator"); -std::string QueueReplicator::getType() const -{ - return typeName; -} +std::string QueueReplicator::getType() const { return TYPE_NAME; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 13fbc6e86c..8085c11b82 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -38,7 +38,13 @@ class Deliverable; namespace ha { /** - * Dummy exchange for processing replication messages + * Exchange created on a backup broker to replicate a queue on the primary. + * + * Puts replicated messages on the local queue, handles dequeue events. + * Creates a ReplicatingSubscription on the primary by passing special + * arguments to the consume command. + * + * THREAD SAFE. */ class QueueReplicator : public broker::Exchange { @@ -50,12 +56,11 @@ class QueueReplicator : public broker::Exchange bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); - static bool isReplicatingLink(const std::string&); - static bool initReplicationSettings(const std::string&, broker::QueueRegistry&, framing::FieldTable&); - static const std::string typeName; + private: void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; framing::SequenceNumber current; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 1f9a8730b3..1a2e55755e 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -30,9 +30,16 @@ namespace ha { using namespace framing; using namespace broker; +using namespace std; -const std::string DOLLAR("$"); -const std::string INTERNAL("_internal"); +// FIXME aconway 2011-11-28: review all arugment names, prefixes etc. +// Do we want a common HA prefix? +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); +const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number"); +const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number"); + +const string DOLLAR("$"); +const string INTERNAL("_internal"); class ReplicationStateInitialiser { @@ -61,7 +68,7 @@ class ReplicationStateInitialiser const qpid::framing::SequenceNumber end; }; -std::string mask(const std::string& in) +string mask(const string& in) { return DOLLAR + in + INTERNAL; } @@ -69,29 +76,30 @@ std::string mask(const std::string& in) boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( SemanticState* _parent, - const std::string& _name, + const string& _name, Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, - const std::string& _tag, - const std::string& _resumeId, + const string& _tag, + const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments ) { + return boost::shared_ptr<broker::SemanticState::ConsumerImpl>( new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments)); } ReplicatingSubscription::ReplicatingSubscription( SemanticState* _parent, - const std::string& _name, + const string& _name, Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, - const std::string& _tag, - const std::string& _resumeId, + const string& _tag, + const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments ) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), @@ -158,7 +166,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m) void ReplicatingSubscription::generateDequeueEvent() { - std::string buf(range.encodedSize(),'\0'); + string buf(range.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); range.encode(buffer); range.clear(); @@ -166,7 +174,7 @@ void ReplicatingSubscription::generateDequeueEvent() //generate event message boost::intrusive_ptr<Message> event = new Message(); - AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0))); + AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 0ba2b2f0de..07ec5ef513 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -38,7 +38,12 @@ class OwnershipToken; namespace ha { /** - * Subscriber to a remote queue that replicates to a local queue. + * A susbcription that represents a backup replicating a queue. + * + * Runs on the primary. Delays completion of messages till the backup + * has acknowledged, informs backup of locally dequeued messages. + * + * THREAD UNSAFE: used only in broker connection thread. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, public broker::QueueObserver @@ -53,6 +58,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, const framing::FieldTable& arguments); }; + // Argument names for consume command. + static const std::string QPID_REPLICATING_SUBSCRIPTION; + static const std::string QPID_HIGH_SEQUENCE_NUMBER; + static const std::string QPID_LOW_SEQUENCE_NUMBER; + ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 04d0f9d9ee..e621052fea 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -446,6 +446,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { } void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { + // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed. if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); broker.getExchanges().registerExchange(qr); @@ -456,11 +457,6 @@ bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const frami bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } -const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR); - -string WiringReplicator::getType() const -{ - return typeName; -} +string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; } }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h index 6a5edb114c..32109d8368 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.h +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h @@ -38,8 +38,15 @@ class SessionHandler; namespace ha { /** - * Pseudo-exchange for recreating local queues and/or exchanges on - * receipt of QMF events indicating their creation on another node + * Replicate wiring on a backup broker. + * + * Implemented as an exchange that subscribes to receive QMF + * configuration events from the primary. It configures local queues + * exchanges and bindings to replicate the primary. + * It also creates QueueReplicators for newly replicated queues. + * + * THREAD SAFE: Has no mutable state. + * */ class WiringReplicator : public broker::Exchange { @@ -54,8 +61,6 @@ class WiringReplicator : public broker::Exchange void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); - static const std::string typeName; - private: void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); @@ -66,8 +71,6 @@ class WiringReplicator : public broker::Exchange void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); void doResponseBind(types::Variant::Map& values); - - private: void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index cfffd6c2e5..bb06e77a69 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -21,7 +21,7 @@ <!-- Monitor and control HA status of a broker. --> <class name="HaBroker"> - <property name="status" type="sstr" desc="HA statu: PRIMARY, BACKUP, SOLO"/> + <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/> <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO"> <arg name="status" type="sstr" dir="I"/> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 79123058b7..021401bb08 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -89,7 +89,7 @@ class ShortTests(BrokerTest): self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) # Create config, send messages before starting the backup, to test catch-up replication. - primary = self.ha_broker(name="primary") + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() setup(p, "1") # Start the backup |