summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-11 19:23:30 +0000
committerAlan Conway <aconway@apache.org>2012-10-11 19:23:30 +0000
commitf75678b1f215af028b25802c46e274e2d10551b5 (patch)
tree89267ad6025ae3884d8de9a2d783051014db2472 /qpid/cpp/src
parent12eb9e230a50eee63378d1a84f62e57bce4e2e92 (diff)
downloadqpid-python-f75678b1f215af028b25802c46e274e2d10551b5.tar.gz
Bug 860701 - QPID-4350: HA handle auto-delete queues
Subscribed auto-delete queues are deleted by the backup. Timed auto-delete queues are deleted after the timeout. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1397243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueObserver.h1
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp226
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h28
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp38
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp9
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py21
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py72
10 files changed, 326 insertions, 86 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 95cba7bd8e..9cf2f541ce 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1072,6 +1072,8 @@ void Queue::destroyed()
notifyDeleted();
{
Mutex::ScopedLock lock(messageLock);
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&QueueObserver::destroy, _1));
observers.clear();
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h
index 29e867253e..2ba98f6945 100644
--- a/qpid/cpp/src/qpid/broker/QueueObserver.h
+++ b/qpid/cpp/src/qpid/broker/QueueObserver.h
@@ -69,6 +69,7 @@ class QueueObserver
virtual void requeued(const Message&) = 0;
virtual void consumerAdded( const Consumer& ) {};
virtual void consumerRemoved( const Consumer& ) {};
+ virtual void destroy() {};
private:
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 14e6e1a5d1..18d2af76c2 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -23,6 +23,7 @@
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
@@ -84,25 +85,27 @@ const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
const string BIND("bind");
-const string UNBIND("unbind");
const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
const string CREATED("created");
const string DISP("disp");
+const string DEST("dest");
const string DURABLE("durable");
const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
const string EXNAME("exName");
const string EXTYPE("exType");
+const string HA_BROKER("habroker");
const string KEY("key");
-const string BINDING_KEY("bindingKey");
const string NAME("name");
+const string PARTIAL("partial");
const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,10 +125,7 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
- return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
@@ -174,22 +174,78 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& lp, BrokerReplicator& br) :
+ logPrefix(lp), brokerReplicator(br) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ }
+ void channelException(framing::session::DetachCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ }
+ void executionException(framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+ BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+ virtual void connection(Connection&) {}
+ virtual void opened(Connection&) {}
+
+ virtual void closed(Connection& c) {
+ if (brokerReplicator.link && &c == brokerReplicator.connection)
+ brokerReplicator.disconnected();
+ }
+ virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
+ private:
+ BrokerReplicator& brokerReplicator;
+};
+
+template <class E> BrokerReplicator::EventKey eventKey() {
+ return make_pair(E::PACKAGE_NAME, E::EVENT_NAME);
+}
+
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
- cleaner(*this)
+ cleaner(*this),
+ connection(0)
{
+ broker.getConnectionObservers().add(
+ boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+
+ dispatch[eventKey<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare;
+ dispatch[eventKey<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete;
+ dispatch[eventKey<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare;
+ dispatch[eventKey<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete;
+ dispatch[eventKey<EventBind>()] = &BrokerReplicator::doEventBind;
+ dispatch[eventKey<EventUnbind>()] = &BrokerReplicator::doEventUnbind;
+ dispatch[eventKey<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate;
+ dispatch[eventKey<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe;
}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
- broker.getLinks().declare(
+ std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
name, // name for bridge
*link, // parent
false, // durable
@@ -206,13 +262,28 @@ void BrokerReplicator::initialize() {
// calls are run.
boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
);
+ result.first->setErrorListener(
+ boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
BrokerReplicator::~BrokerReplicator() { shutdown(); }
+namespace {
+void collectQueueReplicators(
+ const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (qr) collect.insert(qr);
+}
+} // namespace
+
void BrokerReplicator::shutdown() {
QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
- broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1));
+ set<boost::shared_ptr<QueueReplicator> > collect;
+ broker.getExchanges().eachExchange(
+ boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
+ for_each(collect.begin(), collect.end(),
+ boost::bind(&QueueReplicator::deactivate, _1));
}
// This is called in the connection IO thread when the bridge is started.
@@ -221,7 +292,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
- assert(link->getConnection());
+ connection = link->getConnection();
+ assert(connection);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getUrl();
@@ -280,13 +352,9 @@ void BrokerReplicator::route(Deliverable& msg) {
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
- else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
- else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
- else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
- else if (match<EventBind>(schema)) doEventBind(values);
- else if (match<EventUnbind>(schema)) doEventUnbind(values);
- else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
+ EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+ EventDispatchMap::iterator i = dispatch.find(key);
+ if (i != dispatch.end()) (this->*(i->second))(values);
}
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -337,10 +405,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
deleteQueue(name);
}
- settings.populate(args, settings.storeSettings);
- CreateQueueResult result = createQueue(
- name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
- assert(result.second); // Should be created since we destroed the previous queue above.
+ replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+ values[ALTEX].asString());
}
}
@@ -447,6 +513,14 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
haBroker.setMembership(members);
}
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+ // Ignore queue replicator subscriptions.
+ if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+ QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+ if (qr) qr->setSubscribed();
+}
+
namespace {
// Get the alternate exchange from the exchange field of a queue or exchange response.
@@ -484,12 +558,15 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
}
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- CreateQueueResult result =
- createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
- getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the queue to already exist if we are failing over.
- if (!result.second)
- QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+ name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ if (qr) {
+ Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+ if (i != values.end() && isIntegerType(i->second.getType())) {
+ if (i->second.asInt64()) qr->setSubscribed();
+ }
+ }
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -580,7 +657,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+ const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
@@ -588,30 +666,22 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
+ return qr;
}
-}
-
-void BrokerReplicator::deactivateQueue(const std::string& queueName) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
- if (qr) {
- qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed.
- broker.getExchanges().destroy(qr->getName());
- }
-}
-
-void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) {
- deactivateQueue(q->getName());
-}
-
-void BrokerReplicator::deleteQueue(const std::string& name) {
- deactivateQueue(name);
- try {
+ return boost::shared_ptr<QueueReplicator>();
+}
+
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+ boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
+ if (qr) qr->deactivate();
+ Queue::shared_ptr queue = broker.getQueues().find(name);
+ if (queue) {
+ // Purge before deleting to ensure that we don't reroute any
+ // messages. Any reroutes will be done at the primary and
+ // replicated as normal.
+ if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
broker.deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
- } catch (const framing::NotFoundException&) {
- QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
}
}
@@ -624,7 +694,7 @@ void BrokerReplicator::deleteExchange(const std::string& name) {
}
}
-BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -641,14 +711,15 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
- boost::shared_ptr<Queue> queue = result.first;
- if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
- if (result.second && !alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange,
- boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ boost::shared_ptr<QueueReplicator> qr;
+ if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+ if (result.second) {
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ }
}
- return result;
+ return qr;
}
BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
@@ -720,8 +791,39 @@ void BrokerReplicator::Cleaner::cleanExchanges() {
void BrokerReplicator::Cleaner::cleanQueues() {
for_each(queues.begin(), queues.end(),
- boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1));
+ boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true));
queues.clear();
}
+void BrokerReplicator::autoDeleteCheck(
+ boost::shared_ptr<Exchange> ex, set<string>& result)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (!qr) return;
+ assert(qr);
+ if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+ }
+ else {
+ // Mark for immediate deletion.
+ result.insert(qr->getQueue()->getName());
+ }
+ }
+}
+
+void BrokerReplicator::disconnected() {
+ QPID_LOG(info, logPrefix << "Disconnected");
+ connection = 0;
+ // Clean up auto-delete queues
+ set<string> deleteQueues;
+ broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
+ this, _1, boost::ref(deleteQueues)));
+ // Don't purge before deleting, the primary is gone so we need to
+ // reroute the deleted messages.
+ for_each(deleteQueues.begin(), deleteQueues.end(),
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, false));
+}
+
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index bbdf3e2c0e..4845360631 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -40,6 +40,7 @@ class Broker;
class Link;
class Bridge;
class SessionHandler;
+class Connection;
}
namespace framing {
@@ -83,6 +84,13 @@ class BrokerReplicator : public broker::Exchange,
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
+ typedef std::pair<std::string,std::string> EventKey;
+ typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+ typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+ typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+
/** Keep track of queues and exchanges that need to be cleaned up. */
class Cleaner {
public:
@@ -94,7 +102,7 @@ class BrokerReplicator : public broker::Exchange,
// Forget a queue/exchange that does not need cleaning
void forgetExchange(const std::string& name);
void forgetQueue(const std::string& name);
-
+ // Clean up queues/exchange that are no longer on primary
void cleanExchanges();
void cleanQueues();
@@ -110,6 +118,9 @@ class BrokerReplicator : public broker::Exchange,
};
friend class Cleaner;
+ class ErrorListener;
+ class ConnectionObserver;
+
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
@@ -119,6 +130,7 @@ class BrokerReplicator : public broker::Exchange,
void doEventBind(types::Variant::Map&);
void doEventUnbind(types::Variant::Map&);
void doEventMembersUpdate(types::Variant::Map&);
+ void doEventSubscribe(types::Variant::Map&);
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
@@ -126,9 +138,9 @@ class BrokerReplicator : public broker::Exchange,
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- CreateQueueResult createQueue(
+ QueueReplicatorPtr replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -142,11 +154,13 @@ class BrokerReplicator : public broker::Exchange,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
- void deactivateQueue(const std::string& name);
- void deactivate(boost::shared_ptr<broker::Queue> q);
- void deleteQueue(const std::string& name);
+ bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+ void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
+ void autoDeleteCheck(boost::shared_ptr<broker::Exchange>, std::set<std::string>&);
+ void disconnected();
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
@@ -159,6 +173,8 @@ class BrokerReplicator : public broker::Exchange,
typedef std::set<std::string> StringSet;
StringSet replicatedExchanges; // exchanges that have been replicated.
Cleaner cleaner;
+ broker::Connection* connection;
+ EventDispatchMap dispatch;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 69c8a56873..5b9993bd90 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
@@ -55,6 +56,10 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+}
+
bool QueueReplicator::isEventKey(const std::string key) {
const std::string& prefix = QPID_HA_EVENT_PREFIX;
bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
@@ -74,19 +79,33 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(error, logPrefix << "Execution error: " << msg);
}
void detach() {
- QPID_LOG(error, logPrefix << "Unexpectedly detached.");
+ QPID_LOG(debug, logPrefix << "Session detached");
}
private:
std::string logPrefix;
};
+class QueueReplicator::QueueObserver : public broker::QueueObserver {
+ public:
+ QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {}
+ void enqueued(const Message&) {}
+ void dequeued(const Message&) {}
+ void acquired(const Message&) {}
+ void requeued(const Message&) {}
+ void consumerAdded( const Consumer& ) {}
+ void consumerRemoved( const Consumer& ) {}
+ void destroy() { queueReplicator->deactivate(); }
+ private:
+ boost::shared_ptr<QueueReplicator> queueReplicator;
+};
+
QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo())
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -118,18 +137,21 @@ void QueueReplicator::activate() {
bridge = result.first;
bridge->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+ boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this()));
+ queue->addObserver(observer);
}
QueueReplicator::~QueueReplicator() { deactivate(); }
void QueueReplicator::deactivate() {
- // destroy the route
+ QPID_LOG(debug, logPrefix << "Deactivated");
sys::Mutex::ScopedLock l(lock);
- if (bridge) {
- bridge->close();
- bridge.reset();
- QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
- }
+ if (bridge) bridge->close();
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ link.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
}
// Called in a broker connection thread when the bridge is created.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index c9eb318aa1..b302162286 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -58,6 +58,8 @@ class QueueReplicator : public broker::Exchange,
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
static std::string replicatorName(const std::string& queueName);
+ static bool isReplicatorName(const std::string&);
+
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
@@ -77,8 +79,16 @@ class QueueReplicator : public broker::Exchange,
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ // Set if the queue has ever been subscribed to, used for auto-delete cleanup.
+ void setSubscribed() { subscribed = true; }
+ bool isSubscribed() { return subscribed; }
+
+ boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
+
private:
class ErrorListener;
+ class QueueObserver;
+
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
@@ -90,6 +100,7 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
BrokerInfo brokerInfo;
+ bool subscribed;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index ae6e7181d1..e48db44716 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -377,7 +377,14 @@ bool ReplicatingSubscription::doDispatch()
Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
- return ConsumerImpl::doDispatch();
+ try {
+ return ConsumerImpl::doDispatch();
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2012-10-05: detect queue deletion, no warning.
+ QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
+ return false;
+ }
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 0ab0d13424..0597a933a3 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -203,8 +203,8 @@ class Popen(subprocess.Popen):
self.wait()
def kill(self):
- try:
- subprocess.Popen.kill(self)
+ self.expect = EXPECT_EXIT_FAIL
+ try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 462c90bfb3..5cf28f6ef9 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -129,6 +129,16 @@ class HaBroker(Broker):
assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
self, status, self._status)
+ def wait_queue(self, queue, timeout=1):
+ """ Wait for queue to be visible via QMF"""
+ agent = self.agent()
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+
+ def wait_no_queue(self, queue, timeout=1):
+ """ Wait for queue to be invisible via QMF"""
+ agent = self.agent()
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
@@ -185,6 +195,9 @@ class HaBroker(Broker):
def ready(self):
return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+ def kill(self):
+ self._agent = None
+ return Broker.kill(self)
class HaCluster(object):
_cluster_count = 0
@@ -225,7 +238,6 @@ class HaCluster(object):
def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].expect = EXPECT_EXIT_FAIL
self[i].kill()
if promote_next: self[(i+1) % len(self)].promote()
@@ -254,12 +266,11 @@ class HaCluster(object):
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
+
def wait_address(session, address):
"""Wait for an address to become valid."""
def check():
- try:
- session.sender(address)
- return True
+ try: session.sender(address); return True
except NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
@@ -269,3 +280,5 @@ def valid_address(session, address):
session.receiver(address)
return True
except NotFound: return False
+
+
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 7ce0d1701a..63fd48b66c 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -652,9 +652,9 @@ acl deny all all
self.assertRaises(NotFound, s.receiver, ("e2"));
- def test_auto_delete_qpid_4285(self):
- """Regression test for QPID-4285: an auto delete queue gets stuck in
- a partially deleted state and causes replication errors."""
+ def test_delete_qpid_4285(self):
+ """Regression test for QPID-4285: on deleting a queue it gets stuck in a
+ partially deleted state and causes replication errors."""
cluster = HaCluster(self,2)
cluster[1].wait_status("ready")
s = cluster[0].connect().session()
@@ -669,6 +669,72 @@ acl deny all all
except NotFound: pass
assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+ def alt_setup(self, session, suffix):
+ # Create exchange to use as alternate and a queue bound to it.
+ # altex exchange: acts as alternate exchange
+ session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+ # altq queue bound to altex, collect re-routed messages.
+ session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+ def test_auto_delete_close(self):
+ """Verify auto-delete queues are deleted on backup if auto-deleted
+ on primary"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p, "1")
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ cluster[1].wait_queue("adq1")
+ cluster[1].wait_queue("adq2")
+ r.close() # trigger auto-delete of adq1
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[1].wait_queue("adq2")
+
+ def test_auto_delete_crash(self):
+ """Verify auto-delete queues are deleted on backup if the primary crashes"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p,"1")
+
+ # adq1 is subscribed so will be auto-deleted.
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ # adq2 is subscribed after cluster[2] starts.
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ # adq3 is never subscribed.
+ p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
+
+ cluster.start()
+ cluster[2].wait_status("ready")
+
+ p.receiver("adq2") # Subscribed after cluster[2] joined
+
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
+ cluster[0].kill()
+
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].wait_no_queue("adq2")
+ cluster[1].wait_queue("adq3")
+
+ cluster[2].wait_no_queue("adq1")
+ cluster[2].wait_no_queue("adq2")
+ cluster[2].wait_queue("adq3")
+
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
+
+ def test_auto_delete_timeout(self):
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session().receiver("q;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ cluster[1].wait_queue("q")
+ cluster[0].kill()
+ cluster[1].wait_queue("q") # Not timed out yet
+ cluster[1].wait_no_queue("q") # Wait for timeout
def fairshare(msgs, limit, levels):
"""