summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:05:13 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:05:13 +0000
commitd563fd77c419b9518caebd8d7baf3c7865c1890c (patch)
tree3b639729a3b9644752e4db5c68a18ff5cb652fbb
parent6cdac3d8cae283df9c4a023c0c1e2e51250dbb02 (diff)
downloadqpid-python-d563fd77c419b9518caebd8d7baf3c7865c1890c.tar.gz
QPID-3603: Integrate ReplicatingSubscription into the HA code.
HaBroker registers the ConsumerFactory, QueueReplicator sets appropriate arguments in consume command. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245482 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp48
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h13
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h12
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.h15
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py2
12 files changed, 80 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index de2b09660c..775c4cd862 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -118,7 +118,7 @@ void SemanticState::consume(const string& tag,
const ConsumerFactories::Factories& cf(
session.getBroker().getConsumerFactories().get());
ConsumerImpl::shared_ptr c;
- for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c)
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index d3a2e71ff6..39ddc527b0 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -43,8 +43,8 @@ using types::Variant;
using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
- // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
- if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
+ // FIXME aconway 2011-11-24: identifying the primary.
+ if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
Url url(s.brokerUrl);
QPID_LOG(info, "HA: Acting as backup to " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
@@ -59,12 +59,6 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
link = result.first;
boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
broker.getExchanges().registerExchange(wr);
-
- // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the tests
- // The tests pass with a plain subscription if we dont add the factory.
-// broker.getConsumerFactories().add(
-// boost::shared_ptr<ReplicatingSubscription::Factory>(
-// new ReplicatingSubscription::Factory()));
}
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index cee626379b..b4183a4dba 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -37,6 +37,9 @@ class Settings;
/**
* State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
*/
class Backup
{
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 38ae19a11e..af82909ba3 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "HaBroker.h"
#include "Settings.h"
+#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
@@ -61,6 +62,10 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
<< ", broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
}
HaBroker::~HaBroker() {}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index f09b2acaaf..bf9de4cc0c 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -20,6 +20,7 @@
*/
#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -63,14 +64,25 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
QueueReplicator::~QueueReplicator() {}
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ // No lock needed, no mutable member variables are used.
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
- peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, framing::FieldTable());
+ framing::FieldTable settings;
+ // FIXME aconway 2011-11-28: string constants.
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
+
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
-
}
@@ -117,39 +129,13 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
}
}
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
- return name.find(REPLICATOR) == 0;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (queue) {
- settings.setInt("qpid.replicating-subscription", 1);
- settings.setInt("qpid.high_sequence_number", queue->getPosition());
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest)) {
- settings.setInt("qpid.low_sequence_number", oldest);
- }
- }
- return true;
- } else {
- return false;
- }
-}
-
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-const std::string QueueReplicator::typeName("queue-replicator");
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
-std::string QueueReplicator::getType() const
-{
- return typeName;
-}
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 13fbc6e86c..8085c11b82 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -38,7 +38,13 @@ class Deliverable;
namespace ha {
/**
- * Dummy exchange for processing replication messages
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
*/
class QueueReplicator : public broker::Exchange
{
@@ -50,12 +56,11 @@ class QueueReplicator : public broker::Exchange
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
- static bool isReplicatingLink(const std::string&);
- static bool initReplicationSettings(const std::string&, broker::QueueRegistry&, framing::FieldTable&);
- static const std::string typeName;
+
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
framing::SequenceNumber current;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 1f9a8730b3..1a2e55755e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -30,9 +30,16 @@ namespace ha {
using namespace framing;
using namespace broker;
+using namespace std;
-const std::string DOLLAR("$");
-const std::string INTERNAL("_internal");
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("_internal");
class ReplicationStateInitialiser
{
@@ -61,7 +68,7 @@ class ReplicationStateInitialiser
const qpid::framing::SequenceNumber end;
};
-std::string mask(const std::string& in)
+string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
@@ -69,29 +76,30 @@ std::string mask(const std::string& in)
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* _parent,
- const std::string& _name,
+ const string& _name,
Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
- const std::string& _tag,
- const std::string& _resumeId,
+ const string& _tag,
+ const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) {
+
return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments));
}
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* _parent,
- const std::string& _name,
+ const string& _name,
Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
- const std::string& _tag,
- const std::string& _resumeId,
+ const string& _tag,
+ const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
@@ -158,7 +166,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m)
void ReplicatingSubscription::generateDequeueEvent()
{
- std::string buf(range.encodedSize(),'\0');
+ string buf(range.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
range.encode(buffer);
range.clear();
@@ -166,7 +174,7 @@ void ReplicatingSubscription::generateDequeueEvent()
//generate event message
boost::intrusive_ptr<Message> event = new Message();
- AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 0ba2b2f0de..07ec5ef513 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -38,7 +38,12 @@ class OwnershipToken;
namespace ha {
/**
- * Subscriber to a remote queue that replicates to a local queue.
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
public broker::QueueObserver
@@ -53,6 +58,11 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
const framing::FieldTable& arguments);
};
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+ static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 04d0f9d9ee..e621052fea 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -446,6 +446,7 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
}
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
broker.getExchanges().registerExchange(qr);
@@ -456,11 +457,6 @@ bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const frami
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
-const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
-
-string WiringReplicator::getType() const
-{
- return typeName;
-}
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
index 6a5edb114c..32109d8368 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.h
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -38,8 +38,15 @@ class SessionHandler;
namespace ha {
/**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
*/
class WiringReplicator : public broker::Exchange
{
@@ -54,8 +61,6 @@ class WiringReplicator : public broker::Exchange
void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
- static const std::string typeName;
-
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
@@ -66,8 +71,6 @@ class WiringReplicator : public broker::Exchange
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
-
- private:
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index cfffd6c2e5..bb06e77a69 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -21,7 +21,7 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
- <property name="status" type="sstr" desc="HA statu: PRIMARY, BACKUP, SOLO"/>
+ <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/>
<method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
<arg name="status" type="sstr" dir="I"/>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 79123058b7..021401bb08 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -89,7 +89,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
# Create config, send messages before starting the backup, to test catch-up replication.
- primary = self.ha_broker(name="primary")
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
setup(p, "1")
# Start the backup