summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:12:17 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:12:17 +0000
commitd4bfe561c411267e66dc051521b6f7889f7900ac (patch)
tree6bfc9bab8209056f5d24db459f3de80ba60bd1c2
parentb506dd2444b1dd0be4b799233da6efe57e6d49d6 (diff)
downloadqpid-python-d4bfe561c411267e66dc051521b6f7889f7900ac.tar.gz
QPID-3603: Replicate unbind events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245522 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp49
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py20
3 files changed, 59 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index a496ab7583..58602b5511 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -41,6 +42,7 @@ namespace qpid {
namespace ha {
using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventUnbind;
using qmf::org::apache::qpid::broker::EventExchangeDeclare;
using qmf::org::apache::qpid::broker::EventExchangeDelete;
using qmf::org::apache::qpid::broker::EventQueueDeclare;
@@ -70,6 +72,7 @@ const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
const string BIND("bind");
+const string UNBIND("unbind");
const string BINDING("binding");
const string CREATED("created");
const string DISP("disp");
@@ -171,6 +174,12 @@ void sendQuery(const string className, const string& queueName, SessionHandler&
sessionHandler.out->handle(header);
sessionHandler.out->handle(content);
}
+
+void translate(const Variant& value, framing::FieldTable& outArgs) {
+ if (!value.isVoid())
+ amqp_0_10::translate(value.asMap(), outArgs);
+}
+
} // namespace
BrokerReplicator::~BrokerReplicator() {}
@@ -237,21 +246,19 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
else if (match<EventBind>(schema)) doEventBind(values);
- // FIXME aconway 2011-11-21: handle unbind & all other relevant events.
+ else if (match<EventUnbind>(schema)) doEventUnbind(values);
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
- amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ translate(values[ARGUMENTS].asMap(), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
- else QPID_LOG(error, "HA: Backup received unknown response: type=" << type
+ else QPID_LOG(error, "HA: Backup received unknown response type=" << type
<< " values=" << values);
-
- // FIXME aconway 2011-12-06: handle all relevant response types.
}
} else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
} catch (const std::exception& e) {
@@ -264,7 +271,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ translate(argsMap, args);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -312,7 +319,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ translate(argsMap, args);
if (broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -356,7 +363,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
- amqp_0_10::translate(values[ARGS].asMap(), args);
+ translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -365,12 +372,32 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
}
}
+void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate unbinds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ translate(values[ARGS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->unbind(queue, key, &args);
+ }
+}
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(values[ARGUMENTS].asMap());
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ translate(argsMap, args);
string name(values[NAME].asString());
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
@@ -396,7 +423,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(values[ARGUMENTS].asMap());
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ translate(argsMap, args);
if (broker.createExchange(
values[NAME].asString(),
values[TYPE].asString(),
@@ -445,7 +472,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
- amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ translate(values[ARGUMENTS].asMap(), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 154dd340ac..f0b9e0b599 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -63,14 +63,18 @@ class BrokerReplicator : public broker::Exchange
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
void doEventExchangeDelete(types::Variant::Map& values);
void doEventBind(types::Variant::Map&);
+ void doEventUnbind(types::Variant::Map&);
+
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
+
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
broker::Broker& broker;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ed9786674a..38f243a5c1 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -55,12 +55,15 @@ class ShortTests(BrokerTest):
except NotFound: pass
def test_replication(self):
+ """Test basic replication of wiring and messages before and
+ after backup has connected"""
+
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
def exchange(name, replicate, bindq):
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
- def setup(p, prefix):
+ def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
s = p.sender(queue(prefix+"q1", "all"))
for m in ["a", "b", "1"]: s.send(Message(m))
@@ -71,6 +74,14 @@ class ShortTests(BrokerTest):
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ # Test unbind
+ p.sender(queue(prefix+"q4", "all")).send(Message("6"))
+ s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+ s3.send(Message("7"))
+ # Use old connection to unbind
+ us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
+ us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
+ p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
# FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "wiring"))
@@ -93,13 +104,16 @@ class ShortTests(BrokerTest):
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+ b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+ self.assert_browse_retry(b, prefix+"q4", ["6","7"])
+
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
# Create config, send messages before starting the backup, to test catch-up replication.
- setup(p, "1")
+ setup(p, "1", primary)
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
- setup(p, "2")
+ setup(p, "2", primary)
# Verify the data on the backup
b = backup.connect().session()