diff options
author | Alan Conway <aconway@apache.org> | 2012-05-15 21:06:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-05-15 21:06:27 +0000 |
commit | 85c171a93d07b00c0601ed9347931a759be53cfe (patch) | |
tree | ef50a969ed6949521eb54dfc576d75381270a021 | |
parent | 7f358c5dc2c54e9ba3d076e141704b0b72f1d885 (diff) | |
download | qpid-python-85c171a93d07b00c0601ed9347931a759be53cfe.tar.gz |
QPID-3603: HA remove backup Replicator code to track unready queues.
Tracking readiness will be done on the primary with qmf notification to backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338894 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 5 |
9 files changed, 24 insertions, 64 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 33a4ef4080..1041062997 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -165,8 +165,7 @@ Variant::Map asMapVoid(const Variant& value) { BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix(hb), - haBroker(hb), broker(hb.getBroker()), link(l), - unreadyCount(boost::bind(&BrokerReplicator::ready, this)) + haBroker(hb), broker(hb.getBroker()), link(l) { framing::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); @@ -232,8 +231,6 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - // Queue ready count - count one for the query in progress. - ++unreadyCount; QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName); } @@ -246,8 +243,6 @@ void BrokerReplicator::route(Deliverable& msg) { // decode as list string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); - - string type; // FIXME aconway 2012-04-26: quick hack for end-of query, need to handle multi-message responses if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); @@ -263,7 +258,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); + string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); @@ -273,11 +268,6 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == HA_BROKER) doResponseHaBroker(values); } } - // FIXME aconway 2012-04-26: when the queue query is complete - if (type == QUEUE) { - // Count 1 for the query, which is now complete. - --unreadyCount; - } } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "configuration failed: " << e.what() << ": while handling: " << list); @@ -310,8 +300,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[RHOST].asString()); assert(result.second); QPID_LOG(debug, logPrefix << "queue declare event: " << name); - startQueueReplicator(result.first, 0); // No unreadyCount for declare events. - // FIXME aconway 2012-04-26: but we will need to count them after a failover. + startQueueReplicator(result.first); } } @@ -445,13 +434,8 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); - QueueReplicatorPtr qr; // It is normal for the queue to already exist if we are failing over. - // FIXME aconway 2012-04-26: not correct, unreadyCount - if (result.second) qr = startQueueReplicator(result.first, &unreadyCount); - else qr = findQueueReplicator(name); - if (qr) ++unreadyCount; - // existing QR may refcount down before I've gone thru the responses. + if (result.second) startQueueReplicator(result.first); QPID_LOG(debug, logPrefix << "queue response: " << name); } @@ -538,19 +522,16 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { } } -BrokerReplicator::QueueReplicatorPtr BrokerReplicator::startQueueReplicator( - const boost::shared_ptr<Queue>& queue, Counter* unready) +void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - boost::shared_ptr<QueueReplicator> qr; if (haBroker.replicateLevel(queue->getSettings()) == ALL) { - qr.reset(new QueueReplicator( - LogPrefix(haBroker, queue->getName()), queue, link, unready)); + boost::shared_ptr<QueueReplicator> qr( + new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); haBroker.activatedBackup(queue->getName()); } - return qr; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 8b7987a89d..da9802268a 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -88,15 +88,13 @@ class BrokerReplicator : public broker::Exchange void doResponseHaBroker(types::Variant::Map& values); QueueReplicatorPtr findQueueReplicator(const std::string& qname); - QueueReplicatorPtr startQueueReplicator( - const boost::shared_ptr<broker::Queue>&, Counter*); + void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); void ready(); LogPrefix logPrefix; HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; - Counter unreadyCount; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 178535f1af..bc5e9927c2 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -133,7 +133,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, // FIXME aconway 2012-04-27: don't allow promotion in catch-up // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); // throw Exception("Still catching up, cannot be promoted."); - promoting(l); // FIXME aconway 2012-04-27: disallow + promoting(l); break; case READY: promoting(l); break; case PROMOTING: break; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 27cd4d7111..0a577d08e4 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -63,7 +63,7 @@ Primary::Primary(HaBroker& b) : } } -void Primary::addReplica(const std::string& q) { +void Primary::readyReplica(const std::string& q) { sys::Mutex::ScopedLock l(lock); if (!activated) { QueueCounts::iterator i = queues.find(q); diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 5a1a61ae75..7e347fdbe2 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -41,7 +41,7 @@ class HaBroker; * State associated with a primary broker. Tracks replicating * subscriptions to determine when primary is ready. * - * THREAD SAFE: addReplica is called in arbitray threads. + * THREAD SAFE: readyReplica is called in arbitray threads. */ class Primary { @@ -49,7 +49,7 @@ class Primary static Primary* get() { return instance; } Primary(HaBroker& b); - void addReplica(const std::string& q); + void readyReplica(const std::string& q); void removeReplica(const std::string& q); private: diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index b0bf1ce194..76840ea92e 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -48,7 +48,6 @@ using namespace framing; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); -const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; @@ -62,11 +61,9 @@ bool QueueReplicator::isEventKey(const std::string key) { QueueReplicator::QueueReplicator(const LogPrefix& lp, boost::shared_ptr<Queue> q, - boost::shared_ptr<Link> l, - Counter* counter) + boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), - logPrefix(lp), queue(q), link(l), - unreadyCount(counter) + logPrefix(lp), queue(q), link(l) { framing::Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -116,8 +113,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; - if (unreadyCount) ++(*unreadyCount); // We are unready. - // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup // during fail-over. This optimization was removed to simplify @@ -186,10 +181,6 @@ void QueueReplicator::route(Deliverable& msg) } queue->setPosition(position); } - else if (key == READY_EVENT_KEY) { - QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition()); - if (unreadyCount) --(*unreadyCount); // We are now ready. - } // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index db4a901274..f583b650fa 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -58,18 +58,13 @@ class QueueReplicator : public broker::Exchange, public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; - static const std::string READY_EVENT_KEY; static std::string replicatorName(const std::string& queueName); /** Test if a string is an event key */ static bool isEventKey(const std::string key); - /** - * @para unreadyCount can be 0 if we don't need a ready count from this queue. - */ QueueReplicator(const LogPrefix&, boost::shared_ptr<broker::Queue> q, - boost::shared_ptr<broker::Link> l, - Counter* unreadyCount=0); + boost::shared_ptr<broker::Link> l); ~QueueReplicator(); @@ -93,7 +88,6 @@ class QueueReplicator : public broker::Exchange, boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; boost::shared_ptr<broker::Bridge> bridge; - Counter* unreadyCount; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 62757a9060..9bab20048c 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -94,7 +94,7 @@ ReplicatingSubscription::ReplicatingSubscription( resumeId, resumeTtl, arguments), logPrefix(lp, queue->getName()), dummy(new Queue(mask(name))), - sentReady(false) + ready(false) { // Separate the remote part from a "local-remote" address for logging. string address = parent->getSession().getConnection().getUrl(); @@ -137,7 +137,7 @@ void ReplicatingSubscription::setReadyPosition() { QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at " << readyPosition << logSuffix); // Fake lock, only called during creation: - sendReady(*(sys::Mutex::ScopedLock*)0); + setReady(*(sys::Mutex::ScopedLock*)0); } else { QPID_LOG(debug, logPrefix << "backup subscribed, catching up " @@ -173,7 +173,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { { sys::Mutex::ScopedLock l(lock); // If we have advanced to the initial position, the backup is ready. - if (qm.position >= readyPosition) sendReady(l); + if (qm.position >= readyPosition) setReady(l); } return delivered; } @@ -187,15 +187,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } // Send a ready event to the backup. -void ReplicatingSubscription::sendReady(const sys::Mutex::ScopedLock&) { - if (sentReady) return; - sentReady = true; - framing::Buffer buffer; - sendEvent(QueueReplicator::READY_EVENT_KEY, buffer); +void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { + if (ready) return; + ready = true; QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix); // Notify Primary that a subscription is ready. - // FIXME aconway 2012-04-30: rename addReplica->readyReplica - if (Primary::get()) Primary::get()->addReplica(getQueue()->getName()); + if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); } // INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 66b2651f23..97416249d0 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -111,14 +111,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, framing::SequenceSet dequeues; framing::SequenceNumber backupPosition; framing::SequenceNumber readyPosition; - bool sentReady; + bool ready; void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); void sendPositionEvent(framing::SequenceNumber); - void sendReady(const sys::Mutex::ScopedLock&); - void sendReadyEvent(const sys::Mutex::ScopedLock&); + void setReady(const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&); /** Dummy consumer used to get the front position on the queue */ |