summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-26 21:26:37 +0000
committerAlan Conway <aconway@apache.org>2012-07-26 21:26:37 +0000
commitbf08e0296d434464bcc6f4af9c4a32a2397089cb (patch)
treeef00bb098d351aa69308120cb0868cbd4a986c1e
parentf5a4940ed0d4b54edac1bb768ec7ac0595118d0c (diff)
downloadqpid-python-bf08e0296d434464bcc6f4af9c4a32a2397089cb.tar.gz
QPID-4107 HA does not replicate alternate-exchange
Set alternate exchange on replicated queues and exchanges. If the exchange is available, set it immediately. Otherwise remember what needs to be set so it can be set when the exchange becomes available. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1366206 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/ha.mk1
-rw-r--r--qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h73
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp156
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py51
6 files changed, 244 insertions, 55 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index e27aca0630..bb8a1a7227 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -626,6 +626,7 @@ set (ha_default ON)
option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
+ qpid/ha/AltExchangeSetter.h
qpid/ha/BackupConnectionExcluder.h
qpid/ha/BrokerInfo.cpp
qpid/ha/BrokerInfo.h
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 906dd775bd..9ec46a5156 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -23,6 +23,7 @@
dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
+ qpid/ha/AltExchangeSetter.h \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
qpid/ha/BackupConnectionExcluder.h \
diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
new file mode 100644
index 0000000000..08690e68bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
@@ -0,0 +1,73 @@
+#ifndef QPID_HA_ALTERNATEEXCHANGESETTER_H
+#define QPID_HA_ALTERNATEEXCHANGESETTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "boost/function.hpp"
+#include <map>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Sets the alternate exchange on queues and exchanges.
+ * Holds onto queues/exchanges if necessary till the alternate exchange is available.
+ * THREAD UNSAFE
+ */
+class AlternateExchangeSetter
+{
+ public:
+ typedef boost::function<void(boost::shared_ptr<broker::Exchange>)> SetFunction;
+
+ AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
+
+ void setAlternate(const std::string& altEx, const SetFunction& setter) {
+ broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+ if (ex) setter(ex); // Set immediately.
+ else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
+ }
+
+ void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
+ // Update the setters for this exchange
+ std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());
+ for (Setters::iterator i = range.first; i != range.second; ++i)
+ i->second(exchange);
+ setters.erase(range.first, range.second);
+ }
+
+ void clear() {
+ if (!setters.empty())
+ QPID_LOG(warning, "Some alternate exchanges were not resolved.");
+ setters.clear();
+ }
+
+ private:
+ typedef std::multimap<std::string, SetFunction> Setters;
+ broker::ExchangeRegistry& exchanges;
+ Setters setters;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ALTERNATEEXCHANGESETTER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1a1cad1e42..9214bc2f87 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -41,6 +41,7 @@
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include <algorithm>
#include <sstream>
+#include <iostream>
#include <assert.h>
namespace qpid {
@@ -56,6 +57,7 @@ using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
using namespace framing;
using std::string;
+using std::ostream;
using types::Variant;
using namespace broker;
@@ -72,6 +74,7 @@ const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
const string ALTEX("altEx");
+const string ALTEXCHANGE("altExchange");
const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
@@ -93,6 +96,7 @@ const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
const string HA_BROKER("habroker");
+const string PARTIAL("partial");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,7 +126,9 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName,
+ SessionHandler& sessionHandler)
+{
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
@@ -142,6 +148,7 @@ void sendQuery(const string& packageName, const string& className, const string&
props->setAppId(QMF2);
props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className);
AMQFrame header(headerBody);
header.setBof(false);
header.setEof(false);
@@ -164,14 +171,14 @@ Variant::Map asMapVoid(const Variant& value) {
if (!value.isVoid()) return value.asMap();
else return Variant::Map();
}
-
} // namespace
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l),
- initialized(false)
+ initialized(false),
+ alternates(hb.getBroker().getExchanges())
{}
void BrokerReplicator::initialize() {
@@ -249,13 +256,15 @@ void BrokerReplicator::route(Deliverable& msg) {
if (haBroker.getStatus() == JOINING) haBroker.setStatus(CATCHUP);
const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
Variant::List list;
try {
- if (!isQMFv2(msg.getMessage()) || !headers)
+ if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getFrames().getContent();
amqp_0_10::ListCodec::decode(content, list);
+ QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
@@ -283,6 +292,10 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
+ if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+ // We have received all of the exchange response.
+ alternates.clear();
+ }
}
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
@@ -309,19 +322,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
broker.getQueues().destroy(name);
stopQueueReplicator(name);
}
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- values[DURABLE].asBool(),
- autoDel,
- 0, // no owner regardless of exclusivity on primary
- // FIXME aconway 2012-07-06: handle alternate exchange
- values[ALTEX].asString(),
- args,
- userId,
- remoteHost);
- assert(result.second); // Should be true since we destroyed existing queue above
- startQueueReplicator(result.first);
+ boost::shared_ptr<Queue> queue = createQueue(
+ name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
+ assert(queue); // Should be created since we destroed the previous queue above.
+ if (queue) startQueueReplicator(queue);
}
}
@@ -359,17 +363,9 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
broker.getExchanges().destroy(name);
QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
}
- std::pair<boost::shared_ptr<Exchange>, bool> result =
- broker.createExchange(
- name,
- values[EXTYPE].asString(),
- values[DURABLE].asBool(),
- // FIXME aconway 2012-07-06: handle alternate exchanges
- values[ALTEX].asString(),
- args,
- userId,
- remoteHost);
- assert(result.second);
+ boost::shared_ptr<Exchange> exchange =
+ createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
+ assert(exchange);
}
}
@@ -431,6 +427,22 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
haBroker.setMembership(members);
}
+namespace {
+
+// Get the alternate exchange from the exchange field of a queue or exchange response.
+static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:");
+
+string getAltExchange(const types::Variant& var) {
+ if (!var.isVoid()) {
+ management::ObjectId oid(var);
+ string key = oid.getV2Key();
+ if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key);
+ return key.substr(EXCHANGE_KEY_PREFIX.size());
+ }
+ else return string();
+}
+}
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.isReplicated(
@@ -443,22 +455,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- values[DURABLE].asBool(),
- values[AUTODELETE].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- userId,
- remoteHost);
-
+ boost::shared_ptr<Queue> queue =
+ createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
// It is normal for the queue to already exist if we are failing over.
- if (result.second)
- startQueueReplicator(result.first);
- else
- QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ if (queue) startQueueReplicator(queue);
+ else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -468,16 +470,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- bool created = broker.createExchange(
- name,
- values[TYPE].asString(),
- values[DURABLE].asBool(),
- "", // FIXME aconway 2012-07-09: need to include alternate-exchange
- args,
- userId,
- remoteHost
- ).second;
- QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
+ boost::shared_ptr<Exchange> exchange = createExchange(
+ name, values[TYPE].asString(), values[DURABLE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
}
namespace {
@@ -564,6 +560,60 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) {
}
}
+boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& alternateExchange)
+{
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ durable,
+ autodelete,
+ 0, // no owner regardless of exclusivity on primary
+ string(), // Set alternate exchange below
+ arguments,
+ userId,
+ remoteHost);
+ if (result.second) {
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ }
+ return result.first;
+ }
+ else return boost::shared_ptr<Queue>();
+}
+
+boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args,
+ const std::string& alternateExchange)
+{
+ std::pair<boost::shared_ptr<Exchange>, bool> result =
+ broker.createExchange(
+ name,
+ type,
+ durable,
+ string(), // Set alternate exchange below
+ args,
+ userId,
+ remoteHost);
+ if (result.second) {
+ alternates.addExchange(result.first);
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
+ }
+ return result.first;
+ }
+ else return boost::shared_ptr<Exchange>();
+}
+
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index e2ca8f9e14..dbe4822d74 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -24,8 +24,10 @@
#include "types.h"
#include "ReplicationTest.h"
+#include "AlternateExchangeSetter.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
+#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -95,6 +97,20 @@ class BrokerReplicator : public broker::Exchange,
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
void stopQueueReplicator(const std::string& name);
+ boost::shared_ptr<broker::Queue> createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& alternateExchange);
+
+ boost::shared_ptr<broker::Exchange> createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args,
+ const std::string& alternateExchange);
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
@@ -102,6 +118,7 @@ class BrokerReplicator : public broker::Exchange,
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
bool initialized;
+ AlternateExchangeSetter alternates;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index cf2ab3508f..414ede7cca 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -63,6 +63,7 @@ class HaBroker(Broker):
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
+ "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12:
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -188,7 +189,7 @@ class HaCluster(object):
self.broker_id += 1
return name
- def start(self, update_urls=True):
+ def start(self, update_urls=True, args=[]):
"""Start a new broker in the cluster"""
b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
self._brokers.append(b)
@@ -760,6 +761,52 @@ acl deny all all
s1.sender("ex").send("foo");
self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ def test_alterante_exchange(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated
+ to new members of a cluster. """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ # altex exchange: acts as alternate exchange
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ # altq queue bound to altex, collect re-routed messages.
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ # 0ex exchange with alternate-exchange altex and no queues bound
+ s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # create queue q with alternate-exchange altex
+ s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ # create a bunch of exchanges to ensure we don't clean up prematurely if the
+ # response comes in multiple fragments.
+ for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("0ex").send("foo")
+ altq = s.receiver("altq")
+ self.assertEqual("foo", altq.fetch(timeout=0).content)
+ s.acknowledge()
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", altq.fetch(timeout=0).content)
+ s.acknowledge()
+
+ # Sanity check: alternate exchanges on original broker
+ verify(cluster[0])
+ # Check backup that was connected during setup.
+ cluster[1].wait_backup("0ex")
+ cluster[1].wait_backup("q")
+ cluster.bounce(0)
+ verify(cluster[1])
+ # Check a newly started backup.
+ cluster.start()
+ cluster[2].wait_backup("0ex")
+ cluster[2].wait_backup("q")
+ cluster.bounce(1)
+ verify(cluster[2])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit