summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp108
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h33
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp3
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py24
7 files changed, 156 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 276e17a8b5..95cba7bd8e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -370,7 +370,7 @@ bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
- checkNotDeleted(c);
+ if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
while (true) {
//TODO: reduce lock scope
@@ -1443,11 +1443,11 @@ QueueListeners& Queue::getListeners() { return listeners; }
Messages& Queue::getMessages() { return *messages; }
const Messages& Queue::getMessages() const { return *messages; }
-void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
+bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted && !c->hideDeletedError()) {
+ if (deleted && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
- }
+ return !deleted;
}
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index d52afec6b9..b628f17c08 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -188,7 +188,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
int getEventMode();
void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
void abandoned(const Message& message);
- void checkNotDeleted(const Consumer::shared_ptr&);
+ bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 2f0d304686..7572c7e516 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -58,8 +59,9 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
using namespace framing;
-using std::string;
+using namespace std;
using std::ostream;
using types::Variant;
using namespace broker;
@@ -177,8 +179,11 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
initialized(false),
- alternates(hb.getBroker().getExchanges())
-{}
+ alternates(hb.getBroker().getExchanges()),
+ cleaner(*this)
+{
+ getArgs().setString(QPID_REPLICATE, printable(NONE).str());
+}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
@@ -223,6 +228,10 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
+ // Scan for existing replicated queues and exchanges. Any that have not been seen by
+ // the time all reponses are received will be cleaned up.
+ cleaner.start();
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -254,7 +263,7 @@ void BrokerReplicator::route(Deliverable& msg) {
}
Variant::List list;
try {
- if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+ if (!MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getContent();
@@ -287,13 +296,18 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
- // We have received all of the exchange response.
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
+ QPID_LOG(debug, logPrefix << "Initial exchange configuration complete.");
+ cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+ QPID_LOG(debug, logPrefix << "Initial queue configuration complete.");
+ cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary
+ }
}
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
+ QPID_LOG(critical, logPrefix << "Configuration replication failed: " << e.what()
<< ": while handling: " << list);
haBroker.shutdown();
throw;
@@ -308,14 +322,15 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ cleaner.forgetQueue(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
- broker.deleteQueue(name, userId, remoteHost);
- stopQueueReplicator(name);
+ deleteQueue(name);
}
settings.populate(args, settings.storeSettings);
CreateQueueResult result =
@@ -346,8 +361,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- stopQueueReplicator(name);
- broker.deleteQueue(name, userId, remoteHost);
+ cleaner.forgetQueue(name);
+ deleteQueue(name);
}
}
@@ -357,13 +372,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+ cleaner.forgetExchange(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
- broker.deleteExchange(name, userId, remoteHost);
- QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
+ deleteExchange(name);
+ QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name);
}
CreateExchangeResult result = createExchange(
name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
@@ -377,12 +393,13 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
+ QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
} else if (!replicationTest.replicateLevel(exchange->getArgs())) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(name, userId, remoteHost);
+ cleaner.forgetExchange(name);
+ deleteExchange(name);
replicatedExchanges.erase(name);
}
}
@@ -457,6 +474,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
values[EXCLUSIVE].asBool()))
return;
string name(values[NAME].asString());
+ cleaner.forgetQueue(name);
QPID_LOG(debug, logPrefix << "Queue response: " << name);
if (broker.getQueues().find(name)) { // Already exists
if (findQueueReplicator(name))
@@ -478,6 +496,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.replicateLevel(argsMap)) return;
string name = values[NAME].asString();
+ cleaner.forgetExchange(name);
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
if (broker.getExchanges().find(name)) {
if (replicatedExchanges.find(name) != replicatedExchanges.end())
@@ -572,7 +591,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
}
}
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
+void BrokerReplicator::deleteQueue(const std::string& name) {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
@@ -580,6 +599,22 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) {
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
+ qr.reset();
+ try {
+ broker.deleteQueue(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name);
+ }
+}
+
+void BrokerReplicator::deleteExchange(const std::string& name) {
+ try {
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+ }
}
BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
@@ -639,4 +674,45 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+// BrokerReplicator::Cleaner
+
+BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : brokerReplicator(br) {}
+
+void BrokerReplicator::Cleaner::start() {
+ queues.clear();
+ exchanges.clear();
+ brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange, this, _1));
+ brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, this, _1));
+}
+
+void BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) {
+ if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex))
+ exchanges.insert(ex->getName());
+}
+
+void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) {
+ if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q))
+ queues.insert(q->getName());
+}
+
+void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) {
+ exchanges.erase(name);
+}
+
+void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) {
+ queues.erase(name);
+}
+
+void BrokerReplicator::Cleaner::cleanExchanges() {
+ for_each(exchanges.begin(), exchanges.end(),
+ boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, _1));
+ exchanges.clear();
+}
+
+void BrokerReplicator::Cleaner::cleanQueues() {
+ for_each(queues.begin(), queues.end(),
+ boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1));
+ queues.clear();
+}
+
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 5c8a983d45..11c828d50e 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <set>
namespace qpid {
@@ -81,6 +82,33 @@ class BrokerReplicator : public broker::Exchange,
typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
+ /** Keep track of queues and exchanges that need to be cleaned up. */
+ class Cleaner {
+ public:
+ Cleaner(BrokerReplicator&);
+
+ /** Scan for existing queues and exchanges. */
+ void start();
+
+ // Forget a queue/exchange that does not need cleaning
+ void forgetExchange(const std::string& name);
+ void forgetQueue(const std::string& name);
+
+ void cleanExchanges();
+ void cleanQueues();
+
+ private:
+ typedef std::set<std::string> Names;
+
+ // add a queue/exchange that may need cleaning.
+ void addExchange(boost::shared_ptr<broker::Exchange>);
+ void addQueue(boost::shared_ptr<broker::Queue>);
+
+ BrokerReplicator& brokerReplicator;
+ Names queues, exchanges;
+ };
+ friend class Cleaner;
+
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
@@ -98,7 +126,6 @@ class BrokerReplicator : public broker::Exchange,
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void stopQueueReplicator(const std::string& name);
CreateQueueResult createQueue(
const std::string& name,
@@ -114,6 +141,9 @@ class BrokerReplicator : public broker::Exchange,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ void deleteQueue(const std::string& name);
+ void deleteExchange(const std::string& name);
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
@@ -125,6 +155,7 @@ class BrokerReplicator : public broker::Exchange,
qpid::Address primary;
typedef std::set<std::string> StringSet;
StringSet replicatedExchanges; // exchanges that have been replicated.
+ Cleaner cleaner;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 23ece4c2d3..c872e408c5 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -70,6 +70,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+ getArgs().setString(QPID_REPLICATE, printable(NONE).str());
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -123,8 +124,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
- QPID_LOG(debug, logPrefix << " subscribe with settings " << settings);
-
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 2f9d9a1211..79db67e3c8 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -209,6 +209,7 @@ class HaCluster(object):
def start(self, update_urls=True, args=[]):
"""Start a new broker in the cluster"""
b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ b.ready()
self._brokers.append(b)
if update_urls: self.update_urls()
return b
@@ -235,6 +236,7 @@ class HaCluster(object):
self._brokers[i] = HaBroker(
self.test, name=b.name, port=b.port(), brokers_url=self.url,
**self.kwargs)
+ self._brokers[i].ready()
def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 3c43c6a914..86f33d8030 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -624,6 +624,30 @@ acl deny all all
actual = [m.content for m in primary.get_messages("pq", 100)]
self.assertEqual(expect, actual)
+ def test_delete_missing_response(self):
+ """Check that a backup correctly deletes leftover queues and exchanges that are
+ missing from the initial reponse set."""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.sender("q1;{create:always}")
+ s.sender("q2;{create:always}")
+ s.sender("e1;{create:always, node:{type:topic}}")
+ s.sender("e2;{create:always, node:{type:topic}}")
+ cluster.bounce(0, promote_next=False)
+ # Fake a primary that has deleted some queues and exchanges.
+ s = cluster[0].connect_admin().session()
+ s.sender("q2;{create:always}")
+ s.sender("e2;{create:always, node:{type:topic}}")
+ s.sender("x;{create:always}") # A new queue so we can wait for the update.
+ cluster[0].promote()
+ # Verify the backup has deleted the missing queues and exchanges
+ cluster[1].wait_status("ready")
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("x");
+ self.assertRaises(NotFound, s.receiver, ("q1"));
+ self.assertRaises(NotFound, s.receiver, ("e1"));
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit