summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-11-30 16:14:02 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-11-30 16:14:02 +0000
commit08659efdc833d924caf8c188cbe624a8bd552c9c (patch)
tree60772e01935060e0db7c0684d53134881b7a63da
parentb41fd08fc751cbc288ed96cbdce7ba3ca074f458 (diff)
downloadqpid-python-08659efdc833d924caf8c188cbe624a8bd552c9c.tar.gz
QPID-3603: resync -kgiusti tracking branch with gsim's work.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208483 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/design_docs/hot-standby-design.txt239
-rw-r--r--qpid/cpp/design_docs/new-ha-design.txt (renamed from qpid/cpp/design_docs/replicating-browser-design.txt)185
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py5
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.h4
-rw-r--r--qpid/cpp/src/Makefile.am9
-rw-r--r--qpid/cpp/src/ha.mk42
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp88
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h19
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h3
-rw-r--r--qpid/cpp/src/qpid/broker/ConsumerFactory.h70
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp45
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h12
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h28
-rw-r--r--qpid/cpp/src/qpid/broker/NodeClone.cpp219
-rw-r--r--qpid/cpp/src/qpid/broker/NodeClone.h54
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueReplicator.cpp128
-rw-r--r--qpid/cpp/src/qpid/broker/QueueReplicator.h57
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp281
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp65
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h56
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp87
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h62
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp67
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/Logging.h55
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp145
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h72
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp235
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h109
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h47
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp463
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.h81
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml34
-rw-r--r--qpid/cpp/src/tests/brokertest.py7
-rw-r--r--qpid/cpp/src/tests/cluster.mk28
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py102
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in1
-rwxr-xr-xqpid/python/qpid-python-test2
-rwxr-xr-xqpid/tools/setup.py3
-rwxr-xr-xqpid/tools/src/py/qpid-ha-status80
51 files changed, 2258 insertions, 1154 deletions
diff --git a/qpid/cpp/design_docs/hot-standby-design.txt b/qpid/cpp/design_docs/hot-standby-design.txt
deleted file mode 100644
index 99a5dc0199..0000000000
--- a/qpid/cpp/design_docs/hot-standby-design.txt
+++ /dev/null
@@ -1,239 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* Another new design for Qpid clustering.
-
-For background see [[./new-cluster-design.txt]] which describes the issues
-with the old design and a new active-active design that could replace it.
-
-This document describes an alternative hot-standby approach.
-
-** Delivery guarantee
-
-We guarantee N-way redundant, at least once delivey. Once a message
-from a client has been acknowledged by the broker, it will be
-delivered even if N-1 brokers subsequently fail. There may be
-duplicates in the event of a failure. We don't make duplicates
-during normal operation (i.e when no brokers have failed)
-
-This is the same guarantee as the old cluster and the alternative
-active-active design.
-
-** Active-active vs. hot standby (aka primary-backup)
-
-An active-active cluster allows clients to connect to any broker in
-the cluster. If a broker fails, clients can fail-over to any other
-live broker.
-
-A hot-standby cluster has only one active broker at a time (the
-"primary") and one or more brokers on standby (the "backups"). Clients
-are only served by the leader, clients that connect to a backup are
-redirected to the leader. The backpus are kept up-to-date in real time
-by the primary, if the primary fails a backup is elected to be the new
-primary.
-
-Aside: A cold-standby cluster is possible using a standalone broker,
-CMAN and shared storage. In this scenario only one broker runs at a
-time writing to a shared store. If it fails, another broker is started
-(by CMAN) and recovers from the store. This bears investigation but
-the store recovery time is probably too long for failover.
-
-** Why hot standby?
-
-Active-active has some advantages:
-- Finding a broker on startup or failover is simple, just pick any live broker.
-- All brokers are always running in active mode, there's no
-- Distributing clients across brokers gives better performance, but see [1].
-- A broker failure affects only clients connected to that broker.
-
-The main problem with active-active is co-ordinating consumers of the
-same queue on multiple brokers such that there are no duplicates in
-normal operation. There are 2 approaches:
-
-Predictive: each broker predicts which messages others will take. This
-the main weakness of the old design so not appealing.
-
-Locking: brokers "lock" a queue in order to take messages. This is
-complex to implement, its not straighforward to determine the most
-performant strategie for passing the lock.
-
-Hot-standby removes this problem. Only the primary can modify queues
-so it just has to tell the backups what it is doing, there's no
-locking.
-
-The primary can enqueue messages and replicate asynchronously -
-exactly like the store does, but it "writes" to the replicas over the
-network rather than writing to disk.
-
-** Failover in a hot-standby cluster.
-
-Hot-standby has some potential performance issues around failover:
-
-- Failover "spike": when the primary fails every client will fail over
- at the same time, putting strain on the system.
-
-- Until a new primary is elected, cluster cannot serve any clients or
- redirect clients to the primary.
-
-We want to minimize the number of re-connect attempts that clients
-have to make. The cluster can use a well-known algorithm to choose the
-new primary (e.g. round robin on a known sequence of brokers) so that
-clients can guess the new primary correctly in most cases.
-
-Even if clients do guess correctly it may be that the new primary is
-not yet aware of the death of the old primary, which is may to cause
-multiple failed connect attempts before clients eventually get
-connected. We will need to prototype to see how much this happens in
-reality and how we can best get clients redirected.
-
-** Threading and performance.
-
-The primary-backup cluster operates analogously to the way the disk store does now:
-- use the same MessageStore interface as the store to interact with the broker
-- use the same asynchronous-completion model for replicating messages.
-- use the same recovery interfaces (?) for new backups joining.
-
-Re-using the well-established store design gives credibility to the new cluster design.
-
-The single CPG dispatch thread was a severe performance bottleneck for the old cluster.
-
-The primary has the same threading model as a a standalone broker with
-a store, which we know that this performs well.
-
-If we use CPG for replication of messages, the backups will receive
-messages in the CPG dispatch thread. To get more concurency, the CPG
-thread can dump work onto internal PollableQueues to be processed in
-parallel.
-
-Messages from the same broker queue need to go onto the same
-PollableQueue. There could be a separate PollableQueue for each broker
-queue. If that's too resource intensive we can use a fixed set of
-PollableQueues and assign broker queues to PollableQueues via hashing
-or round robin.
-
-Another possible optimization is to use multiple CPG queues: one per
-queue or a hashed set, to get more concurrency in the CPG layer. The
-old cluster is not able to keep CPG busy.
-
-TODO: Transactions pose a challenge with these concurrent models: how
-to co-ordinate multiple messages being added (commit a publish or roll
-back an accept) to multiple queues so that all replicas end up with
-the same message sequence while respecting atomicity.
-
-** Use of CPG
-
-CPG provides several benefits in the old cluster:
-- tracking membership (essential for determining the primary)
-- handling "spit brain" (integrates with partition support from CMAN)
-- reliable multicast protocol to distribute messages.
-
-I believe we still need CPG for membership and split brain. We could
-experiment with sending the bulk traffic over AMQP conections.
-
-** Flow control
-
-Need to ensure that
-1) In-memory internal queues used by the cluster don't overflow.
-2) The backups don't fall too far behind on processing CPG messages
-
-** Recovery
-When a new backup joins an active cluster it must get a snapshot
-from one of the other backups, or the primary if there are none. In
-store terms this is "recovery" (old cluster called it an "update)
-
-Compared to old cluster we only replidate well defined data set of the store.
-This is the crucial sore spot of old cluster.
-
-We can also replicated it more efficiently by recovering queues in
-reverse (LIFO) order. That means as clients actively consume messages
-from the front of the queue, they are redeucing the work we have to do
-in recovering from the back. (NOTE: this may not be compatible with
-using the same recovery interfaces as the store.)
-
-** Selective replication
-In this model it's easy to support selective replication of individual queues via
-configuration.
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
- Treated analogously to persistent/durable properties for the store.
-- if not explicitly marked, provide a choice of default
- - default is replicate (replicated message on replicated queue)
- - default is don't replicate
- - default is replicate persistent/durable messages.
-
-** Inconsistent errors
-
-The new design eliminates most sources of inconsistent errors in the
-old design (connections, sessions, security, management etc.) and
-eliminates the need to stall the whole cluster till an error is
-resolved. We still have to handle inconsistent store errors when store
-and cluster are used together.
-
-We also have to include error handling in the async completion loop to
-guarantee N-way at least once: we should only report success to the
-client when we know the message was replicated and stored on all N-1
-backups.
-
-TODO: We have a lot more options than the old cluster, need to figure
-out the best approach, or possibly allow mutliple approaches. Need to
-go thru the various failure cases. We may be able to do recovery on a
-per-queue basis rather than restarting an entire node.
-
-** New members joining
-
-We should be able to catch up much faster than the the old design. A
-new backup can catch up ("recover") the current cluster state on a
-per-queue basis.
-- queues can be updated in parallel
-- "live" updates avoid the the "endless chase"
-
-During a "live" update several things are happening on a queue:
-- clients are publishing messages to the back of the queue, replicated to the backup
-- clients are consuming messages from the front of the queue, replicated to the backup.
-- the primary is sending pre-existing messages to the new backup.
-
-The primary sends pre-existing messages in LIFO order - starting from
-the back of the queue, at the same time clients are consuming from the front.
-The active consumers actually reduce the amount of work to be done, as there's
-no need to replicate messages that are no longer on the queue.
-
-* Steps to get there
-
-** Baseline replication
-Validate the overall design get initial notion of performance. Just
-message+wiring replication, no update/recovery for new members joining,
-single CPG dispatch thread on backups, no failover, no transactions.
-
-** Failover
-Electing primary, backups redirect to primary. Measure failover time
-for large # clients. Strategies to minimise number of retries after a
-failure.
-
-** Flow Control
-Keep internal queues from over-flowing. Similar to internal flow control in old cluster.
-Needed for realistic performance/stress tests
-
-** Concurrency
-Experiment with multiple threads on backups, multiple CPG groups.
-
-** Recovery/new member joining
-Initial status handshake for new member. Recovering queues from the back.
-
-** Transactions
-TODO: How to implement transactions with concurrency. Worst solution:
-a global --cluster-use-transactions flag that forces single thread
-mode. Need to find a better solution.
diff --git a/qpid/cpp/design_docs/replicating-browser-design.txt b/qpid/cpp/design_docs/new-ha-design.txt
index e304258d35..9b6d7d676c 100644
--- a/qpid/cpp/design_docs/replicating-browser-design.txt
+++ b/qpid/cpp/design_docs/new-ha-design.txt
@@ -16,15 +16,14 @@
# specific language governing permissions and limitations
# under the License.
-* FIXME - rewrite all old stuff from hot-standby.txt.
-
-* Another new design for Qpid clustering.
+* An active-passive, hot-standby design for Qpid clustering.
For some background see [[./new-cluster-design.txt]] which describes the
issues with the old design and a new active-active design that could
replace it.
-This document describes an alternative active-passive approach.
+This document describes an alternative active-passive approach based on
+queue browsing to replicate message data.
** Active-active vs. active-passive (hot-standby)
@@ -59,7 +58,19 @@ The primary can enqueue messages and replicate asynchronously -
exactly like the store does, but it "writes" to the replicas over the
network rather than writing to disk.
-** Failover in a hot-standby cluster.
+** Replicating browsers
+
+The unit of replication is a replicating browser. This is an AMQP
+consumer that browses a remote queue via a federation link and
+maintains a local replica of the queue. As well as browsing the remote
+messages as they are added the browser receives dequeue notifications
+when they are dequeued remotely.
+
+On the primary broker incoming mesage transfers are completed only when
+all of the replicating browsers have signaled completion. Thus a completed
+message is guaranteed to be on the backups.
+
+** Failover and Cluster Resource Managers
We want to delegate the failover management to an existing cluster
resource manager. Initially this is rgmanager from Cluster Suite, but
@@ -67,13 +78,13 @@ other managers (e.g. PaceMaker) could be supported in future.
Rgmanager takes care of starting and stopping brokers and informing
brokers of their roles as primary or backup. It ensures there's
-exactly one broker running at any time. It also tracks quorum and
-protects against split-brain.
+exactly one primary broker running at any time. It also tracks quorum
+and protects against split-brain.
Rgmanger can also manage a virtual IP address so clients can just
-retry on a single address to fail over. Alternatively we will support
-configuring a fixed list of broker addresses when qpid is run outside
-of a resource manager.
+retry on a single address to fail over. Alternatively we will also
+support configuring a fixed list of broker addresses when qpid is run
+outside of a resource manager.
Aside: Cold-standby is also possible using rgmanager with shared
storage for the message store (e.g. GFS). If the broker fails, another
@@ -81,18 +92,6 @@ broker is started on a different node and and recovers from the
store. This bears investigation but the store recovery times are
likely too long for failover.
-** Replicating browsers
-
-The unit of replication is a replicating browser. This is an AMQP
-consumer that browses a remote queue via a federation link and
-maintains a local replica of the queue. As well as browsing the remote
-messages as they are added the browser receives dequeue notifications
-when they are dequeued remotely.
-
-On the primary broker incoming mesage transfers are completed only when
-all of the replicating browsers have signaled completion. Thus a completed
-message is guaranteed to be on the backups.
-
** Replicating wiring
New queues and exchanges and their bindings also need to be replicated.
@@ -105,9 +104,10 @@ CPG is not required in this model, an external cluster resource
manager takes care of membership and quorum.
** Selective replication
+
In this model it's easy to support selective replication of individual queues via
-configuration.
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
+configuration.
+- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
Treated analogously to persistent/durable properties for the store.
- if not explicitly marked, provide a choice of default
- default is replicate (replicated message on replicated queue)
@@ -136,7 +136,15 @@ out the best approach, or possibly allow mutliple approaches. Need to
go thru the various failure cases. We may be able to do recovery on a
per-queue basis rather than restarting an entire node.
-** New members joining
+
+** New backups joining
+
+New brokers can join the cluster as backups. Note - if the new broker
+has a new IP address, then the existing cluster members must be
+updated with a new client and broker URLs by a sysadmin.
+
+
+They discover
We should be able to catch up much faster than the the old design. A
new backup can catch up ("recover") the current cluster state on a
@@ -154,6 +162,85 @@ the back of the queue, at the same time clients are consuming from the front.
The active consumers actually reduce the amount of work to be done, as there's
no need to replicate messages that are no longer on the queue.
+** Broker discovery and lifecycle.
+
+The cluster has a client URL that can contain a single virtual IP
+address or a list of real IP addresses for the cluster.
+
+In backup mode, brokers reject connections except from a special
+cluster-admin user.
+
+Clients discover the primary by re-trying connection to the client URL
+until the successfully connect to the primary. In the case of a
+virtual IP they re-try the same address until it is relocated to the
+primary. In the case of a list of IPs the client tries each in
+turn. Clients do multiple retries over a configured period of time
+before giving up.
+
+Backup brokers discover the primary in the same way as clients. There
+is a separate broker URL for brokers since they often will connect
+over a different network to the clients. The broker URL has to be a
+list of IPs rather than one virtual IP so broker members can connect
+to each other.
+
+Brokers have the following states:
+- connecting: backup broker trying to connect to primary - loops retrying broker URL.
+- catchup: connected to primary, catching up on pre-existing wiring & messages.
+- backup: fully functional backup.
+- primary: Acting as primary, serving clients.
+
+** Interaction with rgmanager
+
+rgmanager interacts with qpid via 2 service scripts: backup & primary. These
+scripts interact with the underlying qpidd service.
+
+*** Initial cluster start
+
+rgmanager starts the backup service on all nodes and the primary service on one node.
+
+On the backup nodes qpidd is in the connecting state. The primary node goes into
+the primary state. Backups discover the primary, connect and catch up.
+
+*** Failover
+
+primary broker or node fails. Backup brokers see disconnect and go
+back to connecting mode.
+
+rgmanager notices the failure and starts the primary service on a new node.
+This tells qpidd to go to primary mode. Backups re-connect and catch up.
+
+*** Failback
+
+Cluster of N brokers has suffered a failure, only N-1 brokers
+remain. We want to start a new broker (possibly on a new node) to
+restore redundancy.
+
+If the new broker has a new IP address, the sysadmin pushes a new URL
+to all the existing brokers.
+
+The new broker starts in connecting mode. It discovers the primary,
+connects and catches up.
+
+*** Failure during failback
+
+A second failure occurs before the new backup B can complete its catch
+up. The backup B refuses to become primary by failing the primary
+start script if rgmanager chooses it, so rgmanager will try another
+(hopefully caught-up) broker to be primary.
+
+** Interaction with the store.
+
+# FIXME aconway 2011-11-16: TBD
+- how to identify the "best" store after a total cluster crash.
+- best = last to be primary.
+- primary "generation" - counter passed to backups and incremented by new primary.
+
+restart after crash:
+- clean/dirty flag on disk for admin shutdown vs. crash
+- dirty brokers refuse to be primary
+- sysamdin tells best broker to be primary
+- erase stores? delay loading?
+
** Current Limitations
(In no particular order at present)
@@ -219,6 +306,48 @@ it).
LC5 - Need richer control over which queues/exchanges are propagated, and
which are not.
-Question: is it possible to miss an event on subscribing for
-configuration propagation? are the initial snapshot and subsequent
-events correctly synchronised?
+LC6 - The events and query responses are not fully synchronized.
+
+ In particular it *is* possible to not receive a delete event but
+ for the deleted object to still show up in the query response
+ (meaning the deletion is 'lost' to the update).
+
+ It is also possible for an create event to be received as well
+ as the created object being in the query response. Likewise it
+ is possible to receive a delete event and a query response in
+ which the object no longer appears. In these cases the event is
+ essentially redundant.
+
+ It is not possible to miss a create event and yet not to have
+ the object in question in the query response however.
+
+* User Documentation Notes
+
+Notes to seed initial user documentation. Loosely tracking the implementation,
+some points mentioned in the doc may not be implemented yet.
+
+** High Availability Overview
+Explain basic concepts: hot standby, primary/backup, replicated queue/exchange.
+Network topology: backup links, corosync, separate client/cluster networks.
+Describe failover mechanisms.
+- Client view: URLs, failover, exclusion & discovery.
+- Broker view: similar.
+Role of rmganager & corosync.
+
+** Client view.
+Clients use multi-address URL in base case.
+Clients can't connect to backups, retry till they find primary.
+Only qpid.cluster-admin can connect to backup, must not mess with replicated queues.
+Note connection known-hosts returns client URL, as does amq.failover exchange.
+
+Creating replicated queues & exchanges:
+- qpid.replicate argument,
+- examples using addressing and qpid-config)
+
+** Configuring corosync
+Must be on same network as backup links.
+
+** Configuring rgmanager
+
+** Configuring qpidd
+HA related options.
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 59e951fb6e..b8a1d26fb0 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1498,6 +1498,10 @@ class SchemaClass:
def genNamePackageLower (self, stream, variables):
stream.write (self.packageName.lower ())
+ def genPackageNameUpper (self, stream, variables):
+ up = "_".join(self.packageName.split("."))
+ stream.write (up.upper())
+
def genNameUpper (self, stream, variables):
stream.write (self.name.upper ())
@@ -1642,6 +1646,7 @@ class SchemaPackage:
def genNamespace (self, stream, variables):
stream.write("::".join(self.packageName.split(".")))
+
def genOpenNamespaces (self, stream, variables):
for item in self.packageName.split("."):
stream.write ("namespace %s {\n" % item)
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 4bcd423a26..90f1b4dd4a 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -1,6 +1,6 @@
/*MGEN:commentPrefix=//*/
-#ifndef _MANAGEMENT_/*MGEN:Class.NameUpper*/_
-#define _MANAGEMENT_/*MGEN:Class.NameUpper*/_
+#ifndef _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
+#define _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
//
// Licensed to the Apache Software Foundation (ASF) under one
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 8fde735380..c525f8b24b 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -104,7 +104,8 @@ mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \
-c $(srcdir)/managementgen.cmake -q -b -o qmf \
$(top_srcdir)/../specs/management-schema.xml \
$(srcdir)/qpid/acl/management-schema.xml \
- $(srcdir)/qpid/cluster/management-schema.xml
+ $(srcdir)/qpid/cluster/management-schema.xml \
+ $(srcdir)/qpid/ha/management-schema.xml
$(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp
mgen.timestamp: $(mgen_generator)
@@ -209,6 +210,7 @@ dmoduleexec_LTLIBRARIES =
cmoduleexec_LTLIBRARIES =
include cluster.mk
+include ha.mk
include acl.mk
include qmf.mk
include qmfc.mk
@@ -533,6 +535,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionToken.h \
qpid/broker/Consumer.h \
+ qpid/broker/ConsumerFactory.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
@@ -593,8 +596,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
- qpid/broker/NodeClone.h \
- qpid/broker/NodeClone.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
@@ -622,8 +623,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
- qpid/broker/QueueReplicator.h \
- qpid/broker/QueueReplicator.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
new file mode 100644
index 0000000000..dc4e7c8d0a
--- /dev/null
+++ b/qpid/cpp/src/ha.mk
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# HA plugin makefile fragment, to be included in Makefile.am
+#
+
+dmoduleexec_LTLIBRARIES += ha.la
+
+ha_la_SOURCES = \
+ qpid/ha/Backup.cpp \
+ qpid/ha/Backup.h \
+ qpid/ha/HaBroker.cpp \
+ qpid/ha/HaBroker.h \
+ qpid/ha/HaPlugin.cpp \
+ qpid/ha/Logging.h \
+ qpid/ha/Logging.cpp \
+ qpid/ha/Settings.h \
+ qpid/ha/QueueReplicator.h \
+ qpid/ha/QueueReplicator.cpp \
+ qpid/ha/ReplicatingSubscription.h \
+ qpid/ha/ReplicatingSubscription.cpp \
+ qpid/ha/WiringReplicator.cpp \
+ qpid/ha/WiringReplicator.h
+
+ha_la_LIBADD = libqpidbroker.la
+ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index b00cdd00dc..405482da5d 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,8 +24,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/NodeClone.h"
-#include "qpid/broker/QueueReplicator.h"
+#include "qpid/ha/WiringReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
@@ -59,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << link->getBroker()->getFederationTag();
@@ -77,9 +78,9 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -98,7 +99,7 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -108,60 +109,12 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
- //TODO: something other than this which is nasty...
- bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
-
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
- } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
- //declare and bind an event queue
- peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
- peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
- //subscribe to the queue
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
- Variant::Map request;
- request["_what"] = "OBJECT";
- Variant::Map schema;
- schema["_class_name"] = (i == 0 ? "queue" : "exchange");
- schema["_package_name"] = "org.apache.qpid.broker";
- request["_schema_id"] = schema;
-
- AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId("qmf2");
- props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
- headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
- }
-
} else {
FieldTable queueSettings;
@@ -236,11 +189,6 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -268,7 +216,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
is_queue, is_local, id, excludes, dynamic, sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -285,8 +233,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -311,7 +259,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
management::Args& /*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -358,7 +306,7 @@ void Bridge::sendReorigin()
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
queueName, args.i_src, args.i_key, bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8b4559a871..b849b11ba8 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@ class Connection;
class ConnectionState;
class Link;
class LinkRegistry;
+class SessionHandler;
class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
Bridge(Link* link, framing::ChannelId id, CancellationListener l,
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+ InitializeCallback init
+ );
~Bridge();
void create(Connection& c);
@@ -70,8 +74,8 @@ public:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const { return name; }
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
// Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@ public:
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
+ // Methods needed by initialization functions
+ std::string getQueueName() const { return queueName; }
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@ private:
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
+ InitializeCallback initialize;
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index b3b751be98..e8ada49c62 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -37,6 +37,7 @@
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -199,6 +200,7 @@ public:
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+ ConsumerFactories consumerFactories;
public:
virtual ~Broker();
@@ -357,6 +359,8 @@ public:
const std::string& key,
const std::string& userId,
const std::string& connectionId);
+
+ ConsumerFactories& getConsumerFactories() { return consumerFactories; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 8451f35cb0..0e20719252 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -156,11 +156,14 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen())
+ doIoCallbacks(); // Do any callbacks registered before we opened.
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index a3838fb6f0..7ac7dba56f 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -31,6 +31,9 @@ namespace broker {
class Queue;
class QueueListeners;
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
class Consumer {
const bool acquires;
const bool browseAcquired;
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
new file mode 100644
index 0000000000..abd39fb3f8
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+ public:
+ virtual ~ConsumerFactory() {}
+
+ virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+ public:
+ typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+ /** Thread safety: May only be called during plug-in initialization. */
+ void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); }
+
+ /** Thread safety: May only be called after plug-in initialization. */
+ const Factories& get() const { return factories; }
+
+ private:
+ Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 43ca1ae04b..4fe76dabd8 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
bool _acquired,
bool accepted,
bool _windowing,
- uint32_t _credit, bool _delayedCompletion) : msg(_msg),
+ uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
@@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
ended(accepted && acquired),
windowing(_windowing),
credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
- delayedCompletion(_delayedCompletion)
+ isDelayedCompletion(_isDelayedCompletion)
{}
bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (!ended) {
if (acquired) {
queue->dequeue(ctxt, msg);
- } else if (delayedCompletion) {
+ } else if (isDelayedCompletion) {
//TODO: this is a nasty way to do this; change it
msg.payload->getIngressCompletion().finishCompleter();
QPID_LOG(debug, "Completed " << msg.payload.get());
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 90e72aaf0d..ea33ed5461 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -63,7 +63,7 @@ class DeliveryRecord
* after that).
*/
uint32_t credit;
- bool delayedCompletion;
+ bool isDelayedCompletion;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@ class DeliveryRecord
bool accepted,
bool windowing,
uint32_t credit=0, // Only used if msg is empty.
- bool delayedCompletion=false
+ bool isDelayedCompletion=false
);
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 8010bf43e7..f92d543c2d 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -47,13 +47,13 @@ namespace _qmf = qmf::org::apache::qpid::broker;
Link::Link(LinkRegistry* _links,
MessageStore* _store,
- string& _host,
+ const string& _host,
uint16_t _port,
- string& _transport,
+ const string& _transport,
bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
+ const string& _authMechanism,
+ const string& _username,
+ const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
@@ -79,6 +79,7 @@ Link::Link(LinkRegistry* _links,
}
}
setStateLH(STATE_WAITING);
+ startConnectionLH();
}
Link::~Link ()
@@ -213,28 +214,30 @@ void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
+ if (connection)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
}
void Link::cancel(Bridge::shared_ptr bridge)
{
- {
- Mutex::ScopedLock mutex(lock);
+ Mutex::ScopedLock mutex(lock);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
- }
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
}
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
- }
+ }
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
}
}
+
if (!cancellations.empty()) {
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -282,6 +285,8 @@ void Link::setConnection(Connection* c)
Mutex::ScopedLock mutex(lock);
connection = c;
updateUrls = true;
+ // Process any IO tasks bridges added before setConnection.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::maintenanceVisit ()
@@ -311,7 +316,7 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+ }
void Link::reconnect(const qpid::Address& a)
{
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 4badd8b3a1..acaa9d1cbd 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -92,19 +92,21 @@ namespace qpid {
Link(LinkRegistry* links,
MessageStore* store,
- std::string& host,
+ const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password,
Broker* broker,
management::Manageable* parent = 0);
virtual ~Link();
std::string getHost() { return host; }
uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
bool isDurable() { return durable; }
void maintenanceVisit ();
uint nextChannel();
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index e9885f5462..31b4f1b490 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -124,13 +124,13 @@ bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address&
}
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
uint16_t port,
- string& transport,
+ const string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password)
+ const string& authMechanism,
+ const string& username,
+ const string& password)
{
Mutex::ScopedLock locker(lock);
@@ -151,18 +151,20 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& tag,
- std::string& excludes,
+ const std::string& tag,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+ Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 4c97e4f9d8..7e5b39f223 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -84,28 +84,32 @@ namespace broker {
~LinkRegistry();
std::pair<boost::shared_ptr<Link>, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password);
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password);
+
std::pair<Bridge::shared_ptr, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& id,
- std::string& excludes,
+ const std::string& id,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync);
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
void destroy(const std::string& host, const uint16_t port);
+
void destroy(const std::string& host,
const uint16_t port,
const std::string& src,
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp
deleted file mode 100644
index e8fc227884..0000000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.cpp
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "NodeClone.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
-using qmf::org::apache::qpid::broker::EventExchangeDeclare;
-using qmf::org::apache::qpid::broker::EventExchangeDelete;
-
-namespace qpid {
-namespace broker {
-
-namespace{
-bool isQMFv2(const Message& message)
-{
- const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
- return props && props->getAppId() == "qmf2";
-}
-
-template <class T> bool match(qpid::types::Variant::Map& schema)
-{
- return T::match(schema["_class_name"], schema["_package_name"]);
-}
-
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {}
-
-NodeClone::~NodeClone() {}
-
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers)
-{
- if (isQMFv2(msg.getMessage()) && headers) {
- if (headers->getAsString("qmf.content") == "_event") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- if (list.empty()) {
- QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
- } else {
- try {
- qpid::types::Variant::Map& map = list.front().asMap();
- qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
- qpid::types::Variant::Map& values = map["_values"].asMap();
- if (match<EventQueueDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createQueue(
- values["qName"].asString(),
- values["durable"].asBool(),
- values["autoDel"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventQueueDelete>(schema)) {
- std::string name = values["qName"].asString();
- QPID_LOG(debug, "Notified of deletion of queue " << name);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteQueue(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (queue) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name);
- } else {
- QPID_LOG(debug, "No such queue " << name);
- }
- }
- } else if (match<EventExchangeDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createExchange(
- values["exName"].asString(),
- values["exType"].asString(),
- values["durable"].asBool(),
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventExchangeDelete>(schema)) {
- std::string name = values["exName"].asString();
- QPID_LOG(debug, "Notified of deletion of exchange " << name);
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
- if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteExchange(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (exchange) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name);
- } else {
- QPID_LOG(debug, "No such exchange " << name);
- }
- }
- } catch (const qpid::framing::NotFoundException&) {}
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Error propagating configuration: " << e.what());
- }
- }
- } else if (headers->getAsString("qmf.opcode") == "_query_response") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(debug, "Got query response (" << list.size() << ")");
- for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
- qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
- QPID_LOG(debug, "class: " << type << ", values: " << values);
- if (values["arguments"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
- if (type == "queue") {
- if (!broker.createQueue(
- values["name"].asString(),
- values["durable"].asBool(),
- values["autoDelete"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists");
- }
- } else if (type == "exchange") {
- if (!broker.createExchange(
- values["name"].asString(),
- values["type"].asString(),
- values["durable"].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- } else {
- QPID_LOG(warning, "Ignoring unknow object class: " << type);
- }
- }
- }
- } else {
- QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
- }
- } else {
- QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
- }
-}
-
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
- return target == "qpid.node-cloner";
-}
-
-boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker& broker)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isNodeCloneDestination(target)) {
- //TODO: need to cache the exchange
- QPID_LOG(info, "Creating node cloner");
- exchange.reset(new NodeClone(target, broker));
- }
- return exchange;
-}
-
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string NodeClone::typeName("node-cloner");
-
-std::string NodeClone::getType() const
-{
- return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.h b/qpid/cpp/src/qpid/broker/NodeClone.h
deleted file mode 100644
index 71cac619ad..0000000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef QPID_BROKER_NODEPROPAGATOR_H
-#define QPID_BROKER_NODEPROPAGATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-
-namespace qpid {
-namespace broker {
-
-class Broker;
-
-/**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
- */
-class NodeClone : public Exchange
-{
- public:
- NodeClone(const std::string&, Broker&);
- ~NodeClone();
- std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
-
- static bool isNodeCloneDestination(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
- static const std::string typeName;
- private:
- Broker& broker;
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_NODEPROPAGATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c47ba554d2..0e85fe1072 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1378,7 +1378,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index b66600ef43..a53916ffbc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -403,7 +403,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
deleted file mode 100644
index 01c0c8e272..0000000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/QueueReplicator.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/framing/SequenceSet.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace broker {
-
-QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
-QueueReplicator::~QueueReplicator() {}
-
-namespace {
-const std::string DEQUEUE_EVENT("dequeue-event");
-const std::string REPLICATOR("qpid.replicator-");
-}
-
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
-{
- if (key == DEQUEUE_EVENT) {
- std::string content;
- msg.getMessage().getFrames().getContent(content);
- qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
- qpid::framing::SequenceSet latest;
- latest.decode(buffer);
-
- //TODO: should be able to optimise the following
- for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
- if (current < *i) {
- //haven't got that far yet, record the dequeue
- dequeued.add(*i);
- QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName());
- } else {
- QueuedMessage message;
- if (queue->acquireMessageAt(*i, message)) {
- queue->dequeue(0, message);
- QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName());
- } else {
- QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName());
- }
- }
- }
- } else {
- //take account of any gaps in sequence created by messages
- //dequeued before our subscription reached them
- while (dequeued.contains(++current)) {
- dequeued.remove(current);
- QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName());
- queue->setPosition(current);
- }
- QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current);
- msg.deliverTo(queue);
- }
-}
-
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
- return name.find(REPLICATOR) == 0;
-}
-
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (!queue) {
- QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
- } else {
- //TODO: need to cache the replicator
- QPID_LOG(info, "Creating replicator for " << queueName);
- exchange.reset(new QueueReplicator(target, queue));
- }
- }
- return exchange;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (queue) {
- settings.setInt("qpid.replicating-subscription", 1);
- settings.setInt("qpid.high_sequence_number", queue->getPosition());
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest)) {
- settings.setInt("qpid.low_sequence_number", oldest);
- }
- }
- return true;
- } else {
- return false;
- }
-}
-
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string QueueReplicator::typeName("queue-replicator");
-
-std::string QueueReplicator::getType() const
-{
- return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h
deleted file mode 100644
index 679aa9240d..0000000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-#include "qpid/framing/SequenceSet.h"
-
-namespace qpid {
-namespace broker {
-
-class QueueRegistry;
-
-/**
- * Dummy exchange for processing replication messages
- */
-class QueueReplicator : public Exchange
-{
- public:
- QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
- ~QueueReplicator();
- std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
- static bool isReplicatingLink(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
- static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
- static const std::string typeName;
- private:
- boost::shared_ptr<Queue> queue;
- qpid::framing::SequenceNumber current;
- qpid::framing::SequenceSet dequeued;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 7d10322163..43b5d997d1 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -25,9 +25,7 @@
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/NodeClone.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
@@ -110,15 +108,25 @@ bool SemanticState::exists(const string& consumerTag){
namespace {
const std::string SEPARATOR("::");
}
-
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
- bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl,
+ const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ const ConsumerFactories::Factories& cf(
+ session.getBroker().getConsumerFactories().get());
+ ConsumerImpl::shared_ptr c;
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
+ c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments);
+ if (!c) // Create plain consumer
+ c = ConsumerImpl::shared_ptr(
+ new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -268,224 +276,6 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
-{
- public:
- ReplicatingSubscription(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ReplicatingSubscription();
-
- void init();
- void cancel();
- bool deliver(QueuedMessage& msg);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {}
- void requeued(const QueuedMessage&) {}
-
- protected:
- bool doDispatch();
- private:
- boost::shared_ptr<Queue> events;
- boost::shared_ptr<Consumer> consumer;
- qpid::framing::SequenceSet range;
-
- void generateDequeueEvent();
- class DelegatingConsumer : public Consumer
- {
- public:
- DelegatingConsumer(ReplicatingSubscription&);
- ~DelegatingConsumer();
- bool deliver(QueuedMessage& msg);
- void notify();
- //bool filter(boost::intrusive_ptr<Message>);
- //bool accept(boost::intrusive_ptr<Message>);
- Consumer::Action accept(const QueuedMessage&);
- OwnershipToken* getSession();
- private:
- ReplicatingSubscription& delegate;
- };
-};
-
-SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
- const string& name,
- Queue::shared_ptr queue,
- bool ack,
- bool acquire,
- bool exclusive,
- const string& tag,
- const string& resumeId,
- uint64_t resumeTtl,
- const framing::FieldTable& arguments)
-{
- if (arguments.isSet("qpid.replicating-subscription")) {
- shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
- return result;
- } else {
- return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- }
-}
-
-std::string mask(const std::string& in)
-{
- return std::string("$") + in + std::string("_internal");
-}
-
-class ReplicationStateInitialiser
-{
- public:
- ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
- const qpid::framing::SequenceNumber& start,
- const qpid::framing::SequenceNumber& end);
- void operator()(const QueuedMessage& m) { process(m); }
- private:
- qpid::framing::SequenceSet& results;
- const qpid::framing::SequenceNumber start;
- const qpid::framing::SequenceNumber end;
- void process(const QueuedMessage&);
-};
-
-ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
- events(new Queue(mask(_name))),
- consumer(new DelegatingConsumer(*this))
-{
-
- if (_arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
- qpid::framing::SequenceNumber lwm;
- if (_arguments.isSet("qpid.low_sequence_number")) {
- lwm = _arguments.getAsInt("qpid.low_sequence_number");
- } else {
- lwm = hwm;
- }
- qpid::framing::SequenceNumber oldest;
- if (_queue->getOldest(oldest)) {
- if (oldest >= hwm) {
- range.add(lwm, --oldest);
- } else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(range, lwm, hwm);
- _queue->eachMessage(initialiser);
- } else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "Replica appears to be missing message on master");
- }
- } else {
- //local queue (i.e. master) is empty
- range.add(lwm, _queue->getPosition());
- }
- QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
- //set position of 'cursor'
- position = hwm;
- }
-}
-
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
- return ConsumerImpl::deliver(m);
-}
-
-void ReplicatingSubscription::init()
-{
- getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-void ReplicatingSubscription::cancel()
-{
- getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
- QPID_LOG(debug, "Enqueued message at " << m.position);
- //delay completion
- m.payload->getIngressCompletion().startCompleter();
- QPID_LOG(debug, "Delayed " << m.payload.get());
-}
-
-class Buffer : public qpid::framing::Buffer
-{
- public:
- Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
- ~Buffer() { delete[] getPointer(); }
-};
-
-void ReplicatingSubscription::generateDequeueEvent()
-{
- Buffer buffer(range.encodedSize());
- range.encode(buffer);
- range.clear();
- buffer.reset();
-
- //generate event message
- boost::intrusive_ptr<Message> event = new Message();
- AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
- content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- event->getFrames().append(method);
- event->getFrames().append(header);
- event->getFrames().append(content);
-
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey("dequeue-event");
-
- events->deliver(event);
-}
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
- {
- Mutex::ScopedLock l(lock);
- range.add(m.position);
- QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
- }
- notify();
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
- }
-}
-
-bool ReplicatingSubscription::doDispatch()
-{
- {
- Mutex::ScopedLock l(lock);
- if (!range.empty()) {
- generateDequeueEvent();
- }
- }
- bool r1 = events->dispatch(consumer);
- bool r2 = ConsumerImpl::doDispatch();
- return r1 || r2;
-}
-
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -497,7 +287,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-
) :
Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */
parent(_parent),
@@ -512,7 +301,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -558,7 +347,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, dynamic_cast<const ReplicatingSubscription*>(this));
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, isDelayedCompletion());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -714,10 +503,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
- cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker());
- if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
@@ -1099,36 +888,4 @@ void SemanticState::detached()
}
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
- return delegate.deliver(m);
-}
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
-ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
- const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e)
- : results(r), start(s), end(e)
-{
- results.add(start, end);
-}
-
-void ReplicationStateInitialiser::process(const QueuedMessage& message)
-{
- if (message.position < start) {
- //replica does not have a message that should still be on the queue
- QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
- } else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued, so remove it from the results
- results.remove(message.position);
- } //else message has not been seen by replica yet so can be ignored here
-
-}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 62c829e8f8..e467ec650f 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -157,11 +157,10 @@ class SemanticState : private boost::noncopyable {
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- static shared_ptr create(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
-
+ /** This consumer wants delayed completion.
+ * Overridden by ConsumerImpl subclasses.
+ */
+ virtual bool isDelayedCompletion() const { return false; }
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ca6d6bb193..8cd5072574 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index fe66b77238..f56697c845 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -208,6 +208,8 @@ class Connection :
void queueDequeueSincePurgeState(const std::string&, uint32_t);
+ bool isAnnounced() const { return announced; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03eefa2..2cd1cf9a83 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) {
}
void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
- if (parent.isLocal() && !sentDoOutput && !closing) {
+ if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
new file mode 100644
index 0000000000..17da13ed1e
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using types::Variant;
+using std::string;
+
+Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+ // FIXME aconway 2011-11-24: identifying the primary.
+ if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
+ Url url(s.brokerUrl);
+ QPID_LOG(info, "HA: Acting as backup");
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+
+ // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ s.mechanism, s.username, s.password);
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+ broker.getExchanges().registerExchange(wr);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
new file mode 100644
index 0000000000..b4183a4dba
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -0,0 +1,56 @@
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Link;
+}
+
+namespace ha {
+class Settings;
+
+/**
+ * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
+ */
+class Backup
+{
+ public:
+ Backup(broker::Broker&, const Settings&);
+
+ private:
+ broker::Broker& broker;
+ Settings settings;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000000..22b7e46595
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "HaBroker.h"
+#include "Settings.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+
+namespace {
+Url url(const std::string& s, const std::string& id) {
+ try {
+ return Url(s);
+ } catch (const std::exception& e) {
+ throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
+ }
+}
+} // namespace
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : broker(b),
+ clientUrl(url(s.clientUrl, "ha-client-url")),
+ brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+ mgmtObject(0)
+{
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (ma) {
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this);
+ // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
+ mgmtObject->set_status("solo");
+ ma->addObject(mgmtObject);
+ }
+ QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+ << " broker-url=" << brokerUrl);
+ backup.reset(new Backup(broker, s));
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
+}
+
+HaBroker::~HaBroker() {}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ switch (methodId) {
+ case _qmf::HaBroker::METHOD_SETSTATUS: {
+ std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
+ // FIXME aconway 2011-11-11: placeholder, validate & execute status change.
+ mgmtObject->set_status(status);
+ break;
+ }
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
new file mode 100644
index 0000000000..1a835c9ea0
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_BROKER_H
+#define QPID_HA_BROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Url.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+class Broker;
+}
+namespace ha {
+class Settings;
+class Backup;
+
+/**
+ * HA state and actions associated with a broker.
+ *
+ * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+ */
+class HaBroker : public management::Manageable
+{
+ public:
+ HaBroker(broker::Broker&, const Settings&);
+ ~HaBroker();
+
+ // Implement Manageable.
+ qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ management::Manageable::status_t ManagementMethod (
+ uint32_t methodId, management::Args& args, std::string& text);
+
+ private:
+ broker::Broker& broker;
+ Url clientUrl, brokerUrl;
+ std::auto_ptr<Backup> backup;
+ qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
new file mode 100644
index 0000000000..80f21e4320
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+struct Options : public qpid::Options {
+ Settings& settings;
+ Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
+ addOptions()
+ ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
+ ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
+ ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
+ ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
+ ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ;
+ }
+};
+
+struct HaPlugin : public Plugin {
+
+ Settings settings;
+ Options options;
+ auto_ptr<HaBroker> haBroker;
+
+ HaPlugin() : options(settings) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& ) {}
+
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && settings.enabled) {
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ } else
+ QPID_LOG(info, "HA: Disabled");
+ }
+};
+
+static HaPlugin instance; // Static initialization.
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp
new file mode 100644
index 0000000000..7d8ee38367
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+ : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+ return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h
new file mode 100644
index 0000000000..3b12baa390
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+ const broker::Queue* queue;
+ const framing::SequenceNumber& position;
+ QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+ : queue(q), position(pos) {}
+ QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_HAOSTREAM_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000000..2de9ec5a59
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
+ queue(q), link(l), current(queue->getPosition())
+{
+ // FIXME aconway 2011-11-24: consistent logging.
+ QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ // No lock needed, no mutable member variables are used.
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ framing::FieldTable settings;
+ // FIXME aconway 2011-11-28: string constants.
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
+
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
+}
+
+
+namespace {
+const std::string DEQUEUE_EVENT("dequeue-event");
+const std::string REPLICATOR("qpid.replicator-");
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
+{
+ if (key == DEQUEUE_EVENT) {
+ std::string content;
+ msg.getMessage().getFrames().getContent(content);
+ qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ qpid::framing::SequenceSet latest;
+ latest.decode(buffer);
+
+ //TODO: should be able to optimise the following
+ for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
+ if (current < *i) {
+ //haven't got that far yet, record the dequeue
+ dequeued.add(*i);
+ QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+ QueuePos(queue.get(), *i));
+ } else {
+ QueuedMessage message;
+ if (queue->acquireMessageAt(*i, message)) {
+ queue->dequeue(0, message);
+ QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
+ } else {
+ // FIXME aconway 2011-11-29: error handling
+ QPID_LOG(error, "HA: Unable to dequeue message at "
+ << QueuePos(queue.get(), *i));
+ }
+ }
+ }
+ } else {
+ //take account of any gaps in sequence created by messages
+ //dequeued before our subscription reached them
+ while (dequeued.contains(++current)) {
+ dequeued.remove(current);
+ QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
+ queue->setPosition(current);
+ }
+ QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
+ msg.deliverTo(queue);
+ }
+}
+
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
+
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
new file mode 100644
index 0000000000..8085c11b82
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -0,0 +1,72 @@
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
+class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
+
+/**
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
+ */
+class QueueReplicator : public broker::Exchange
+{
+ public:
+ QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
+ ~QueueReplicator();
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+
+ sys::Mutex lock;
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+ framing::SequenceNumber current;
+ framing::SequenceSet dequeued;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000000..aabbd43631
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("-internal");
+
+class ReplicationStateInitialiser
+{
+ public:
+ ReplicationStateInitialiser(
+ qpid::framing::SequenceSet& r,
+ const qpid::framing::SequenceNumber& s,
+ const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+ {
+ results.add(start, end);
+ }
+
+ void operator()(const QueuedMessage& message) {
+ if (message.position < start) {
+ //replica does not have a message that should still be on the queue
+ QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+ } else if (message.position >= start && message.position <= end) {
+ //i.e. message is within the intial range and has not been dequeued, so remove it from the results
+ results.remove(message.position);
+ } //else message has not been seen by replica yet so can be ignored here
+ }
+
+ private:
+ qpid::framing::SequenceSet& results;
+ const qpid::framing::SequenceNumber start;
+ const qpid::framing::SequenceNumber end;
+};
+
+string mask(const string& in)
+{
+ return DOLLAR + in + INTERNAL;
+}
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) {
+ return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+ new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
+ consumer(new DelegatingConsumer(*this))
+{
+ QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
+ // FIXME aconway 2011-11-25: string constants.
+ if (arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
+ qpid::framing::SequenceNumber lwm;
+ if (arguments.isSet("qpid.low_sequence_number")) {
+ lwm = arguments.getAsInt("qpid.low_sequence_number");
+ } else {
+ lwm = hwm;
+ }
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest)) {
+ if (oldest >= hwm) {
+ range.add(lwm, --oldest);
+ } else if (oldest >= lwm) {
+ ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ queue->eachMessage(initialiser);
+ } else { //i.e. have older message on master than is reported to exist on replica
+ QPID_LOG(warning, "HA: Replica missing message on master");
+ }
+ } else {
+ //local queue (i.e. master) is empty
+ range.add(lwm, queue->getPosition());
+ }
+ QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
+ //set position of 'cursor'
+ position = hwm;
+ }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+ return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+ //delay completion
+ m.payload->getIngressCompletion().startCompleter();
+}
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+ string buf(range.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ range.encode(buffer);
+ range.clear();
+ buffer.reset();
+
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey("dequeue-event");
+
+ events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ range.add(m.position);
+ // FIXME aconway 2011-11-29: q[pos]
+ QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+ }
+ notify();
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+ }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!range.empty()) {
+ generateDequeueEvent();
+ }
+ }
+ bool r1 = events->dispatch(consumer);
+ bool r2 = ConsumerImpl::doDispatch();
+ return r1 || r2;
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+ return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
new file mode 100644
index 0000000000..b83842acbb
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -0,0 +1,109 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
+
+/**
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
+ */
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
+{
+ public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+ static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
+ ReplicatingSubscription(broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ ~ReplicatingSubscription();
+
+ void cancel();
+ bool deliver(broker::QueuedMessage& msg);
+ void enqueued(const broker::QueuedMessage&);
+ void dequeued(const broker::QueuedMessage&);
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ bool isDelayedCompletion() const { return true; }
+
+ protected:
+ bool doDispatch();
+ private:
+ boost::shared_ptr<broker::Queue> events;
+ boost::shared_ptr<broker::Consumer> consumer;
+ qpid::framing::SequenceSet range;
+
+ void generateDequeueEvent();
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(broker::QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<broker::Message>);
+ bool accept(boost::intrusive_ptr<broker::Message>);
+ broker::OwnershipToken* getSession();
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
new file mode 100644
index 0000000000..a2d2e89d82
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -0,0 +1,47 @@
+#ifndef QPID_HA_SETTINGS_H
+#define QPID_HA_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace ha {
+
+using std::string;
+
+/**
+ * Configurable settings for HA.
+ */
+class Settings
+{
+ public:
+ Settings() : enabled(false) {}
+ bool enabled;
+ string clientUrl;
+ string brokerUrl;
+ string username, password, mechanism;
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_SETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
new file mode 100644
index 0000000000..125e2c0ba6
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "WiringReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+ const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+ return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+ // FIXME aconway 2011-11-24: case insenstive comparison.
+ ReplicateLevel rl = RL_NONE;
+ if (str == S_WIRING) rl = RL_WIRING;
+ else if (str == S_ALL) rl = RL_ALL;
+ return rl;
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+} // namespace
+
+WiringReplicator::~WiringReplicator() {}
+
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+ QPID_LOG(debug, "HA: Starting replication from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Activated wiring replicator")
+}
+
+void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+ Variant::List list;
+ try {
+ if (!isQMFv2(msg.getMessage()) || !headers)
+ throw Exception("Unexpected message, not QMF2 event or query response.");
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ amqp_0_10::ListCodec::decode(content, list);
+
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ Variant::Map& map = i->asMap();
+ Variant::Map& schema = map[SCHEMA_ID].asMap();
+ Variant::Map& values = map[VALUES].asMap();
+ QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
+ if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ // FIXME aconway 2011-11-21: handle unbind & all other events.
+ else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
+ else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema)));
+ }
+ } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = i->asMap()[VALUES].asMap();
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
+ }
+ } else {
+ QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+ QPID_LOG(debug, "HA: Error processing configuration message: " << list);
+ }
+}
+
+void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ Variant::Map argsMap = values[ARGS].asMap();
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+
+ QPID_LOG(debug, "HA: Creating queue from event " << name);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODEL].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString());
+ if (result.second) {
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
+ QPID_LOG(debug, "HA: New queue replica " << name);
+ startQueueReplicator(result.first);
+ } else {
+ QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
+ }
+ }
+}
+
+void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+ broker.deleteQueue(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+}
+
+void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ Variant::Map argsMap(values[ARGS].asMap());
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ string name = values[EXNAME].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: New exchange replica " << name);
+ if (!broker.createExchange(
+ name,
+ values[EXTYPE].asString(),
+ values[DURABLE].asBool(),
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString()).second) {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. See comment in doEventQueueDeclare.
+ QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
+ }
+ }
+}
+
+void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
+ string name = values[EXNAME].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+ if (exchange && replicateLevel(exchange->getArgs())) {
+ QPID_LOG(debug, "HA: Deleting exchange:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void WiringReplicator::doEventBind(Variant::Map& values) {
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
+ // We only replicated a binds for a replicated queue to replicated exchange.
+ if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODELETE].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
+ QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
+ }
+}
+
+void WiringReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
+ if (!broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+ }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void WiringReplicator::doResponseBind(Variant::Map& values) {
+ try {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate exchange if queue and exchange are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ }
+}
+
+bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
+
+}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
new file mode 100644
index 0000000000..32109d8368
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -0,0 +1,81 @@
+#ifndef QPID_HA_REPLICATOR_H
+#define QPID_HA_REPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
+}
+
+namespace ha {
+
+/**
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
+ */
+class WiringReplicator : public broker::Exchange
+{
+ public:
+ WiringReplicator(const boost::shared_ptr<broker::Link>&);
+ ~WiringReplicator();
+ std::string getType() const;
+
+ // Exchange methods
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ void doEventQueueDeclare(types::Variant::Map& values);
+ void doEventQueueDelete(types::Variant::Map& values);
+ void doEventExchangeDeclare(types::Variant::Map& values);
+ void doEventExchangeDelete(types::Variant::Map& values);
+ void doEventBind(types::Variant::Map&);
+ void doResponseQueue(types::Variant::Map& values);
+ void doResponseExchange(types::Variant::Map& values);
+ void doResponseBind(types::Variant::Map& values);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+
+ broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_HA_REPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
new file mode 100644
index 0000000000..bb06e77a69
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -0,0 +1,34 @@
+<schema package="org.apache.qpid.ha">
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+ <!-- Monitor and control HA status of a broker. -->
+ <class name="HaBroker">
+ <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/>
+
+ <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
+ <arg name="status" type="sstr" dir="I"/>
+ </method>
+
+ <property name="clientUrl" type="sstr" desc="URL used by clients to connect to the cluster."/>
+ <property name="brokerUrl" type="sstr" desc="URL used by brokers to connect to other brokers in the cluster."/>
+ </class>
+
+</schema>
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 16d7fb0b78..91d5f6a402 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -440,6 +440,7 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -514,6 +515,12 @@ class BrokerTest(TestCase):
actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ def test(): return self.browse(session, queue, 0) == expect_contents
+ retry(test, timeout, delay)
+ self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 199d1e7b57..424d4169e8 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -61,15 +61,25 @@ if HAVE_LIBCPG
# You should do "newgrp ais" before running the tests to run these.
#
-
-# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS += \
- run_cluster_test \
- cluster_read_credit \
- test_watchdog \
- run_cluster_tests \
- federated_cluster_test \
- clustered_replication_test
+# FIXME aconway 2011-11-14: Disable cluster tests on qpid-3603 branch
+# Some cluster tests are known to fail on this branch.
+# Immediate priority is to develop then new HA solution,
+# Cluster will brought up to date when thats done.
+#
+# gsim: its due to the keeping of deleted messages on the deque until they can be popped off either end
+# gsim: that is state that isn't available to new nodes of course
+# gsim: i.e. if you dequeue a message from the middle of the deque
+# gsim: it will not be on updatee but will be hidden on original node(s)
+# gsim: and is needed for the direct indexing
+
+
+# TESTS += \
+# run_cluster_test \
+# cluster_read_credit \
+# test_watchdog \
+# run_cluster_tests \
+# federated_cluster_test \
+# clustered_replication_test
# Clean up after cluster_test and start_cluster
CLEANFILES += cluster_test.acl cluster.ports
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
new file mode 100755
index 0000000000..9b52c2fca7
--- /dev/null
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
+from qpid.messaging import Message, NotFound
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger
+
+
+log = getLogger("qpid.ha-tests")
+
+class ShortTests(BrokerTest):
+ """Short HA functionality tests."""
+
+ def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ return Broker(self, args=["--load-module", BrokerTest.ha_lib,
+ "--ha-enable=yes",
+ "--ha-client-url", client_url,
+ "--ha-broker-url", broker_url,
+ ] + args,
+ **kwargs)
+
+ # FIXME aconway 2011-11-15: work around async replication.
+ def wait(self, session, address):
+ def check():
+ try:
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for %s"%(address)
+
+ def assert_missing(self,session, address):
+ try:
+ session.receiver(address)
+ self.fail("Should not have been replicated: %s"%(address))
+ except NotFound: pass
+
+ def test_replication(self):
+ def queue(name, replicate):
+ return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+
+ def exchange(name, replicate, bindq):
+ return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def setup(p, prefix):
+ """Create config, send messages on the primary p"""
+ p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+ p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+ p.sender(queue(prefix+"x", "wiring"))
+
+ def verify(b, prefix):
+ """Verify setup was replicated to backup b"""
+ # FIXME aconway 2011-11-21: wait for wiring to replicate.
+ self.wait(b, prefix+"x");
+ # Verify backup
+ # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+ self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+ self.assert_missing(b, prefix+"q3")
+ b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+ self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+
+ # Create config, send messages before starting the backup, to test catch-up replication.
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ p = primary.connect().session()
+ setup(p, "1")
+ # Start the backup
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ b = backup.connect().session()
+ verify(b, "1")
+
+ # Create config, send messages after starting the backup, to test steady-state replication.
+ setup(p, "2")
+ verify(b, "2")
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 26be15b48a..100612f978 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule HA_LIB ha.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index 1a0f711ace..13f31fe78b 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -570,6 +570,8 @@ total = len(filtered) + len(ignored)
if opts.xml and not list_only:
xmlr = JunitXmlStyleReporter(opts.xml);
xmlr.begin();
+else:
+ xmlr = None
passed = 0
failed = 0
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py
index feae4bb1bd..48e29ad912 100755
--- a/qpid/tools/setup.py
+++ b/qpid/tools/setup.py
@@ -31,7 +31,8 @@ setup(name="qpid-tools",
"src/py/qpid-route",
"src/py/qpid-stat",
"src/py/qpid-tool",
- "src/py/qmf-tool"],
+ "src/py/qmf-tool",
+ "src/py/qpid-ha-status"],
url="http://qpid.apache.org/",
license="Apache Software License",
description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status
new file mode 100755
index 0000000000..c70e4c9af3
--- /dev/null
+++ b/qpid/tools/src/py/qpid-ha-status
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import qmf.console, optparse, sys
+from qpid.management import managementChannel, managementClient
+
+usage="""
+Usage: qpid-ha-status [broker-address] [status]
+If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo.
+"""
+
+STATUS_VALUES=["primary", "backup", "solo"]
+
+def is_valid_status(value): return value in STATUS_VALUES
+
+def validate_status(value):
+ if not is_valid_status(value):
+ raise Exception("Invalid HA status value: %s"%(value))
+
+class HaBroker:
+ def __init__(self, broker, session):
+ self.session = session
+ try:
+ self.qmf_broker = self.session.addBroker(broker)
+ except Exception, e:
+ raise Exception("Can't connect to %s: %s"%(broker,e))
+ ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")
+ if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+ self.ha_broker = ha_brokers[0];
+
+ def get_status(self):
+ return self.ha_broker.status
+
+ def set_status(self, value):
+ validate_status(value)
+ self.ha_broker.setStatus(value)
+
+def parse_args(args):
+ broker, status = "localhost:5672", None
+ if args and is_valid_status(args[-1]):
+ status = args[-1]
+ args.pop()
+ if args: broker = args[0]
+ return broker, status
+
+def main():
+ try:
+ session = qmf.console.Session()
+ try:
+ broker, status = parse_args(sys.argv[1:])
+ hb = HaBroker(broker, session)
+ if status: hb.set_status(status)
+ else: print hb.get_status()
+ finally:
+ session.close()
+ return 0
+ except Exception, e:
+ print e
+ return -1
+
+if __name__ == "__main__":
+ sys.exit(main())