summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-11 19:23:30 +0000
committerAlan Conway <aconway@apache.org>2012-10-11 19:23:30 +0000
commitf75678b1f215af028b25802c46e274e2d10551b5 (patch)
tree89267ad6025ae3884d8de9a2d783051014db2472 /qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
parent12eb9e230a50eee63378d1a84f62e57bce4e2e92 (diff)
downloadqpid-python-f75678b1f215af028b25802c46e274e2d10551b5.tar.gz
Bug 860701 - QPID-4350: HA handle auto-delete queues
Subscribed auto-delete queues are deleted by the backup. Timed auto-delete queues are deleted after the timeout. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1397243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp226
1 files changed, 164 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 14e6e1a5d1..18d2af76c2 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -23,6 +23,7 @@
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
@@ -84,25 +85,27 @@ const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
const string BIND("bind");
-const string UNBIND("unbind");
const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
const string CREATED("created");
const string DISP("disp");
+const string DEST("dest");
const string DURABLE("durable");
const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
const string EXNAME("exName");
const string EXTYPE("exType");
+const string HA_BROKER("habroker");
const string KEY("key");
-const string BINDING_KEY("bindingKey");
const string NAME("name");
+const string PARTIAL("partial");
const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,10 +125,7 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
- return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
@@ -174,22 +174,78 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& lp, BrokerReplicator& br) :
+ logPrefix(lp), brokerReplicator(br) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ }
+ void channelException(framing::session::DetachCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ }
+ void executionException(framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+ BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+ virtual void connection(Connection&) {}
+ virtual void opened(Connection&) {}
+
+ virtual void closed(Connection& c) {
+ if (brokerReplicator.link && &c == brokerReplicator.connection)
+ brokerReplicator.disconnected();
+ }
+ virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
+ private:
+ BrokerReplicator& brokerReplicator;
+};
+
+template <class E> BrokerReplicator::EventKey eventKey() {
+ return make_pair(E::PACKAGE_NAME, E::EVENT_NAME);
+}
+
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
- cleaner(*this)
+ cleaner(*this),
+ connection(0)
{
+ broker.getConnectionObservers().add(
+ boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+
+ dispatch[eventKey<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare;
+ dispatch[eventKey<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete;
+ dispatch[eventKey<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare;
+ dispatch[eventKey<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete;
+ dispatch[eventKey<EventBind>()] = &BrokerReplicator::doEventBind;
+ dispatch[eventKey<EventUnbind>()] = &BrokerReplicator::doEventUnbind;
+ dispatch[eventKey<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate;
+ dispatch[eventKey<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe;
}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
- broker.getLinks().declare(
+ std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
name, // name for bridge
*link, // parent
false, // durable
@@ -206,13 +262,28 @@ void BrokerReplicator::initialize() {
// calls are run.
boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
);
+ result.first->setErrorListener(
+ boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
BrokerReplicator::~BrokerReplicator() { shutdown(); }
+namespace {
+void collectQueueReplicators(
+ const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (qr) collect.insert(qr);
+}
+} // namespace
+
void BrokerReplicator::shutdown() {
QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
- broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1));
+ set<boost::shared_ptr<QueueReplicator> > collect;
+ broker.getExchanges().eachExchange(
+ boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
+ for_each(collect.begin(), collect.end(),
+ boost::bind(&QueueReplicator::deactivate, _1));
}
// This is called in the connection IO thread when the bridge is started.
@@ -221,7 +292,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
- assert(link->getConnection());
+ connection = link->getConnection();
+ assert(connection);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getUrl();
@@ -280,13 +352,9 @@ void BrokerReplicator::route(Deliverable& msg) {
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
- else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
- else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
- else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
- else if (match<EventBind>(schema)) doEventBind(values);
- else if (match<EventUnbind>(schema)) doEventUnbind(values);
- else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
+ EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+ EventDispatchMap::iterator i = dispatch.find(key);
+ if (i != dispatch.end()) (this->*(i->second))(values);
}
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -337,10 +405,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
deleteQueue(name);
}
- settings.populate(args, settings.storeSettings);
- CreateQueueResult result = createQueue(
- name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
- assert(result.second); // Should be created since we destroed the previous queue above.
+ replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+ values[ALTEX].asString());
}
}
@@ -447,6 +513,14 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
haBroker.setMembership(members);
}
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+ // Ignore queue replicator subscriptions.
+ if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+ QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+ if (qr) qr->setSubscribed();
+}
+
namespace {
// Get the alternate exchange from the exchange field of a queue or exchange response.
@@ -484,12 +558,15 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
}
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- CreateQueueResult result =
- createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
- getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the queue to already exist if we are failing over.
- if (!result.second)
- QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+ name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ if (qr) {
+ Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+ if (i != values.end() && isIntegerType(i->second.getType())) {
+ if (i->second.asInt64()) qr->setSubscribed();
+ }
+ }
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -580,7 +657,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+ const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
@@ -588,30 +666,22 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
+ return qr;
}
-}
-
-void BrokerReplicator::deactivateQueue(const std::string& queueName) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
- if (qr) {
- qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed.
- broker.getExchanges().destroy(qr->getName());
- }
-}
-
-void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) {
- deactivateQueue(q->getName());
-}
-
-void BrokerReplicator::deleteQueue(const std::string& name) {
- deactivateQueue(name);
- try {
+ return boost::shared_ptr<QueueReplicator>();
+}
+
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+ boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
+ if (qr) qr->deactivate();
+ Queue::shared_ptr queue = broker.getQueues().find(name);
+ if (queue) {
+ // Purge before deleting to ensure that we don't reroute any
+ // messages. Any reroutes will be done at the primary and
+ // replicated as normal.
+ if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
broker.deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
- } catch (const framing::NotFoundException&) {
- QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
}
}
@@ -624,7 +694,7 @@ void BrokerReplicator::deleteExchange(const std::string& name) {
}
}
-BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -641,14 +711,15 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
- boost::shared_ptr<Queue> queue = result.first;
- if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
- if (result.second && !alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange,
- boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ boost::shared_ptr<QueueReplicator> qr;
+ if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+ if (result.second) {
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ }
}
- return result;
+ return qr;
}
BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
@@ -720,8 +791,39 @@ void BrokerReplicator::Cleaner::cleanExchanges() {
void BrokerReplicator::Cleaner::cleanQueues() {
for_each(queues.begin(), queues.end(),
- boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1));
+ boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true));
queues.clear();
}
+void BrokerReplicator::autoDeleteCheck(
+ boost::shared_ptr<Exchange> ex, set<string>& result)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (!qr) return;
+ assert(qr);
+ if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+ }
+ else {
+ // Mark for immediate deletion.
+ result.insert(qr->getQueue()->getName());
+ }
+ }
+}
+
+void BrokerReplicator::disconnected() {
+ QPID_LOG(info, logPrefix << "Disconnected");
+ connection = 0;
+ // Clean up auto-delete queues
+ set<string> deleteQueues;
+ broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck,
+ this, _1, boost::ref(deleteQueues)));
+ // Don't purge before deleting, the primary is gone so we need to
+ // reroute the deleted messages.
+ for_each(deleteQueues.begin(), deleteQueues.end(),
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, false));
+}
+
}} // namespace broker