summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-15 21:06:27 +0000
committerAlan Conway <aconway@apache.org>2012-05-15 21:06:27 +0000
commit85c171a93d07b00c0601ed9347931a759be53cfe (patch)
treeef50a969ed6949521eb54dfc576d75381270a021
parent7f358c5dc2c54e9ba3d076e141704b0b72f1d885 (diff)
downloadqpid-python-85c171a93d07b00c0601ed9347931a759be53cfe.tar.gz
QPID-3603: HA remove backup Replicator code to track unready queues.
Tracking readiness will be done on the primary with qmf notification to backup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338894 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp33
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h8
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h5
9 files changed, 24 insertions, 64 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 33a4ef4080..1041062997 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -165,8 +165,7 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix(hb),
- haBroker(hb), broker(hb.getBroker()), link(l),
- unreadyCount(boost::bind(&BrokerReplicator::ready, this))
+ haBroker(hb), broker(hb.getBroker()), link(l)
{
framing::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
@@ -232,8 +231,6 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- // Queue ready count - count one for the query in progress.
- ++unreadyCount;
QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName);
}
@@ -246,8 +243,6 @@ void BrokerReplicator::route(Deliverable& msg) {
// decode as list
string content = msg.getMessage().getFrames().getContent();
amqp_0_10::ListCodec::decode(content, list);
-
- string type; // FIXME aconway 2012-04-26: quick hack for end-of query, need to handle multi-message responses
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
@@ -263,7 +258,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
+ string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
@@ -273,11 +268,6 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == HA_BROKER) doResponseHaBroker(values);
}
}
- // FIXME aconway 2012-04-26: when the queue query is complete
- if (type == QUEUE) {
- // Count 1 for the query, which is now complete.
- --unreadyCount;
- }
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "configuration failed: " << e.what()
<< ": while handling: " << list);
@@ -310,8 +300,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[RHOST].asString());
assert(result.second);
QPID_LOG(debug, logPrefix << "queue declare event: " << name);
- startQueueReplicator(result.first, 0); // No unreadyCount for declare events.
- // FIXME aconway 2012-04-26: but we will need to count them after a failover.
+ startQueueReplicator(result.first);
}
}
@@ -445,13 +434,8 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
- QueueReplicatorPtr qr;
// It is normal for the queue to already exist if we are failing over.
- // FIXME aconway 2012-04-26: not correct, unreadyCount
- if (result.second) qr = startQueueReplicator(result.first, &unreadyCount);
- else qr = findQueueReplicator(name);
- if (qr) ++unreadyCount;
- // existing QR may refcount down before I've gone thru the responses.
+ if (result.second) startQueueReplicator(result.first);
QPID_LOG(debug, logPrefix << "queue response: " << name);
}
@@ -538,19 +522,16 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
-BrokerReplicator::QueueReplicatorPtr BrokerReplicator::startQueueReplicator(
- const boost::shared_ptr<Queue>& queue, Counter* unready)
+void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
- boost::shared_ptr<QueueReplicator> qr;
if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
- qr.reset(new QueueReplicator(
- LogPrefix(haBroker, queue->getName()), queue, link, unready));
+ boost::shared_ptr<QueueReplicator> qr(
+ new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
haBroker.activatedBackup(queue->getName());
}
- return qr;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 8b7987a89d..da9802268a 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -88,15 +88,13 @@ class BrokerReplicator : public broker::Exchange
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- QueueReplicatorPtr startQueueReplicator(
- const boost::shared_ptr<broker::Queue>&, Counter*);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
void ready();
LogPrefix logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
- Counter unreadyCount;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 178535f1af..bc5e9927c2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -133,7 +133,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
// FIXME aconway 2012-04-27: don't allow promotion in catch-up
// QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
// throw Exception("Still catching up, cannot be promoted.");
- promoting(l); // FIXME aconway 2012-04-27: disallow
+ promoting(l);
break;
case READY: promoting(l); break;
case PROMOTING: break;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 27cd4d7111..0a577d08e4 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -63,7 +63,7 @@ Primary::Primary(HaBroker& b) :
}
}
-void Primary::addReplica(const std::string& q) {
+void Primary::readyReplica(const std::string& q) {
sys::Mutex::ScopedLock l(lock);
if (!activated) {
QueueCounts::iterator i = queues.find(q);
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 5a1a61ae75..7e347fdbe2 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -41,7 +41,7 @@ class HaBroker;
* State associated with a primary broker. Tracks replicating
* subscriptions to determine when primary is ready.
*
- * THREAD SAFE: addReplica is called in arbitray threads.
+ * THREAD SAFE: readyReplica is called in arbitray threads.
*/
class Primary
{
@@ -49,7 +49,7 @@ class Primary
static Primary* get() { return instance; }
Primary(HaBroker& b);
- void addReplica(const std::string& q);
+ void readyReplica(const std::string& q);
void removeReplica(const std::string& q);
private:
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index b0bf1ce194..76840ea92e 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -48,7 +48,6 @@ using namespace framing;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
-const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
@@ -62,11 +61,9 @@ bool QueueReplicator::isEventKey(const std::string key) {
QueueReplicator::QueueReplicator(const LogPrefix& lp,
boost::shared_ptr<Queue> q,
- boost::shared_ptr<Link> l,
- Counter* counter)
+ boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- logPrefix(lp), queue(q), link(l),
- unreadyCount(counter)
+ logPrefix(lp), queue(q), link(l)
{
framing::Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -116,8 +113,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
- if (unreadyCount) ++(*unreadyCount); // We are unready.
-
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
// during fail-over. This optimization was removed to simplify
@@ -186,10 +181,6 @@ void QueueReplicator::route(Deliverable& msg)
}
queue->setPosition(position);
}
- else if (key == READY_EVENT_KEY) {
- QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition());
- if (unreadyCount) --(*unreadyCount); // We are now ready.
- }
// Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index db4a901274..f583b650fa 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -58,18 +58,13 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
- static const std::string READY_EVENT_KEY;
static std::string replicatorName(const std::string& queueName);
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- /**
- * @para unreadyCount can be 0 if we don't need a ready count from this queue.
- */
QueueReplicator(const LogPrefix&,
boost::shared_ptr<broker::Queue> q,
- boost::shared_ptr<broker::Link> l,
- Counter* unreadyCount=0);
+ boost::shared_ptr<broker::Link> l);
~QueueReplicator();
@@ -93,7 +88,6 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
- Counter* unreadyCount;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 62757a9060..9bab20048c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -94,7 +94,7 @@ ReplicatingSubscription::ReplicatingSubscription(
resumeId, resumeTtl, arguments),
logPrefix(lp, queue->getName()),
dummy(new Queue(mask(name))),
- sentReady(false)
+ ready(false)
{
// Separate the remote part from a "local-remote" address for logging.
string address = parent->getSession().getConnection().getUrl();
@@ -137,7 +137,7 @@ void ReplicatingSubscription::setReadyPosition() {
QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at "
<< readyPosition << logSuffix);
// Fake lock, only called during creation:
- sendReady(*(sys::Mutex::ScopedLock*)0);
+ setReady(*(sys::Mutex::ScopedLock*)0);
}
else {
QPID_LOG(debug, logPrefix << "backup subscribed, catching up "
@@ -173,7 +173,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
{
sys::Mutex::ScopedLock l(lock);
// If we have advanced to the initial position, the backup is ready.
- if (qm.position >= readyPosition) sendReady(l);
+ if (qm.position >= readyPosition) setReady(l);
}
return delivered;
}
@@ -187,15 +187,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
// Send a ready event to the backup.
-void ReplicatingSubscription::sendReady(const sys::Mutex::ScopedLock&) {
- if (sentReady) return;
- sentReady = true;
- framing::Buffer buffer;
- sendEvent(QueueReplicator::READY_EVENT_KEY, buffer);
+void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
+ if (ready) return;
+ ready = true;
QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix);
// Notify Primary that a subscription is ready.
- // FIXME aconway 2012-04-30: rename addReplica->readyReplica
- if (Primary::get()) Primary::get()->addReplica(getQueue()->getName());
+ if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
}
// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 66b2651f23..97416249d0 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -111,14 +111,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
framing::SequenceNumber readyPosition;
- bool sentReady;
+ bool ready;
void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber);
- void sendReady(const sys::Mutex::ScopedLock&);
- void sendReadyEvent(const sys::Mutex::ScopedLock&);
+ void setReady(const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&);
/** Dummy consumer used to get the front position on the queue */