From 08659efdc833d924caf8c188cbe624a8bd552c9c Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 30 Nov 2011 16:14:02 +0000 Subject: QPID-3603: resync -kgiusti tracking branch with gsim's work. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208483 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/design_docs/hot-standby-design.txt | 239 ----------- qpid/cpp/design_docs/new-ha-design.txt | 353 ++++++++++++++++ .../cpp/design_docs/replicating-browser-design.txt | 224 ---------- qpid/cpp/managementgen/qmfgen/schema.py | 5 + qpid/cpp/managementgen/qmfgen/templates/Class.h | 4 +- qpid/cpp/src/Makefile.am | 9 +- qpid/cpp/src/ha.mk | 42 ++ qpid/cpp/src/qpid/broker/Bridge.cpp | 88 +--- qpid/cpp/src/qpid/broker/Bridge.h | 19 +- qpid/cpp/src/qpid/broker/Broker.h | 4 + qpid/cpp/src/qpid/broker/Connection.cpp | 26 +- qpid/cpp/src/qpid/broker/Consumer.h | 3 + qpid/cpp/src/qpid/broker/ConsumerFactory.h | 70 ++++ qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 6 +- qpid/cpp/src/qpid/broker/DeliveryRecord.h | 4 +- qpid/cpp/src/qpid/broker/Link.cpp | 45 +- qpid/cpp/src/qpid/broker/Link.h | 12 +- qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 29 +- qpid/cpp/src/qpid/broker/LinkRegistry.h | 28 +- qpid/cpp/src/qpid/broker/NodeClone.cpp | 219 ---------- qpid/cpp/src/qpid/broker/NodeClone.h | 54 --- qpid/cpp/src/qpid/broker/Queue.cpp | 2 +- qpid/cpp/src/qpid/broker/Queue.h | 2 +- qpid/cpp/src/qpid/broker/QueueReplicator.cpp | 128 ------ qpid/cpp/src/qpid/broker/QueueReplicator.h | 57 --- qpid/cpp/src/qpid/broker/SemanticState.cpp | 281 +------------ qpid/cpp/src/qpid/broker/SemanticState.h | 9 +- qpid/cpp/src/qpid/broker/SessionHandler.h | 1 + qpid/cpp/src/qpid/cluster/Connection.h | 2 + qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 2 +- qpid/cpp/src/qpid/ha/Backup.cpp | 65 +++ qpid/cpp/src/qpid/ha/Backup.h | 56 +++ qpid/cpp/src/qpid/ha/HaBroker.cpp | 87 ++++ qpid/cpp/src/qpid/ha/HaBroker.h | 62 +++ qpid/cpp/src/qpid/ha/HaPlugin.cpp | 67 +++ qpid/cpp/src/qpid/ha/Logging.cpp | 36 ++ qpid/cpp/src/qpid/ha/Logging.h | 55 +++ qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 145 +++++++ qpid/cpp/src/qpid/ha/QueueReplicator.h | 72 ++++ qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 235 +++++++++++ qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 109 +++++ qpid/cpp/src/qpid/ha/Settings.h | 47 +++ qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 463 +++++++++++++++++++++ qpid/cpp/src/qpid/ha/WiringReplicator.h | 81 ++++ qpid/cpp/src/qpid/ha/management-schema.xml | 34 ++ qpid/cpp/src/tests/brokertest.py | 7 + qpid/cpp/src/tests/cluster.mk | 28 +- qpid/cpp/src/tests/ha_tests.py | 102 +++++ qpid/cpp/src/tests/test_env.sh.in | 1 + qpid/python/qpid-python-test | 2 + qpid/tools/setup.py | 3 +- qpid/tools/src/py/qpid-ha-status | 80 ++++ 52 files changed, 2454 insertions(+), 1350 deletions(-) delete mode 100644 qpid/cpp/design_docs/hot-standby-design.txt create mode 100644 qpid/cpp/design_docs/new-ha-design.txt delete mode 100644 qpid/cpp/design_docs/replicating-browser-design.txt create mode 100644 qpid/cpp/src/ha.mk create mode 100644 qpid/cpp/src/qpid/broker/ConsumerFactory.h delete mode 100644 qpid/cpp/src/qpid/broker/NodeClone.cpp delete mode 100644 qpid/cpp/src/qpid/broker/NodeClone.h delete mode 100644 qpid/cpp/src/qpid/broker/QueueReplicator.cpp delete mode 100644 qpid/cpp/src/qpid/broker/QueueReplicator.h create mode 100644 qpid/cpp/src/qpid/ha/Backup.cpp create mode 100644 qpid/cpp/src/qpid/ha/Backup.h create mode 100644 qpid/cpp/src/qpid/ha/HaBroker.cpp create mode 100644 qpid/cpp/src/qpid/ha/HaBroker.h create mode 100644 qpid/cpp/src/qpid/ha/HaPlugin.cpp create mode 100644 qpid/cpp/src/qpid/ha/Logging.cpp create mode 100644 qpid/cpp/src/qpid/ha/Logging.h create mode 100644 qpid/cpp/src/qpid/ha/QueueReplicator.cpp create mode 100644 qpid/cpp/src/qpid/ha/QueueReplicator.h create mode 100644 qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp create mode 100644 qpid/cpp/src/qpid/ha/ReplicatingSubscription.h create mode 100644 qpid/cpp/src/qpid/ha/Settings.h create mode 100644 qpid/cpp/src/qpid/ha/WiringReplicator.cpp create mode 100644 qpid/cpp/src/qpid/ha/WiringReplicator.h create mode 100644 qpid/cpp/src/qpid/ha/management-schema.xml create mode 100755 qpid/cpp/src/tests/ha_tests.py create mode 100755 qpid/tools/src/py/qpid-ha-status diff --git a/qpid/cpp/design_docs/hot-standby-design.txt b/qpid/cpp/design_docs/hot-standby-design.txt deleted file mode 100644 index 99a5dc0199..0000000000 --- a/qpid/cpp/design_docs/hot-standby-design.txt +++ /dev/null @@ -1,239 +0,0 @@ --*-org-*- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -* Another new design for Qpid clustering. - -For background see [[./new-cluster-design.txt]] which describes the issues -with the old design and a new active-active design that could replace it. - -This document describes an alternative hot-standby approach. - -** Delivery guarantee - -We guarantee N-way redundant, at least once delivey. Once a message -from a client has been acknowledged by the broker, it will be -delivered even if N-1 brokers subsequently fail. There may be -duplicates in the event of a failure. We don't make duplicates -during normal operation (i.e when no brokers have failed) - -This is the same guarantee as the old cluster and the alternative -active-active design. - -** Active-active vs. hot standby (aka primary-backup) - -An active-active cluster allows clients to connect to any broker in -the cluster. If a broker fails, clients can fail-over to any other -live broker. - -A hot-standby cluster has only one active broker at a time (the -"primary") and one or more brokers on standby (the "backups"). Clients -are only served by the leader, clients that connect to a backup are -redirected to the leader. The backpus are kept up-to-date in real time -by the primary, if the primary fails a backup is elected to be the new -primary. - -Aside: A cold-standby cluster is possible using a standalone broker, -CMAN and shared storage. In this scenario only one broker runs at a -time writing to a shared store. If it fails, another broker is started -(by CMAN) and recovers from the store. This bears investigation but -the store recovery time is probably too long for failover. - -** Why hot standby? - -Active-active has some advantages: -- Finding a broker on startup or failover is simple, just pick any live broker. -- All brokers are always running in active mode, there's no -- Distributing clients across brokers gives better performance, but see [1]. -- A broker failure affects only clients connected to that broker. - -The main problem with active-active is co-ordinating consumers of the -same queue on multiple brokers such that there are no duplicates in -normal operation. There are 2 approaches: - -Predictive: each broker predicts which messages others will take. This -the main weakness of the old design so not appealing. - -Locking: brokers "lock" a queue in order to take messages. This is -complex to implement, its not straighforward to determine the most -performant strategie for passing the lock. - -Hot-standby removes this problem. Only the primary can modify queues -so it just has to tell the backups what it is doing, there's no -locking. - -The primary can enqueue messages and replicate asynchronously - -exactly like the store does, but it "writes" to the replicas over the -network rather than writing to disk. - -** Failover in a hot-standby cluster. - -Hot-standby has some potential performance issues around failover: - -- Failover "spike": when the primary fails every client will fail over - at the same time, putting strain on the system. - -- Until a new primary is elected, cluster cannot serve any clients or - redirect clients to the primary. - -We want to minimize the number of re-connect attempts that clients -have to make. The cluster can use a well-known algorithm to choose the -new primary (e.g. round robin on a known sequence of brokers) so that -clients can guess the new primary correctly in most cases. - -Even if clients do guess correctly it may be that the new primary is -not yet aware of the death of the old primary, which is may to cause -multiple failed connect attempts before clients eventually get -connected. We will need to prototype to see how much this happens in -reality and how we can best get clients redirected. - -** Threading and performance. - -The primary-backup cluster operates analogously to the way the disk store does now: -- use the same MessageStore interface as the store to interact with the broker -- use the same asynchronous-completion model for replicating messages. -- use the same recovery interfaces (?) for new backups joining. - -Re-using the well-established store design gives credibility to the new cluster design. - -The single CPG dispatch thread was a severe performance bottleneck for the old cluster. - -The primary has the same threading model as a a standalone broker with -a store, which we know that this performs well. - -If we use CPG for replication of messages, the backups will receive -messages in the CPG dispatch thread. To get more concurency, the CPG -thread can dump work onto internal PollableQueues to be processed in -parallel. - -Messages from the same broker queue need to go onto the same -PollableQueue. There could be a separate PollableQueue for each broker -queue. If that's too resource intensive we can use a fixed set of -PollableQueues and assign broker queues to PollableQueues via hashing -or round robin. - -Another possible optimization is to use multiple CPG queues: one per -queue or a hashed set, to get more concurrency in the CPG layer. The -old cluster is not able to keep CPG busy. - -TODO: Transactions pose a challenge with these concurrent models: how -to co-ordinate multiple messages being added (commit a publish or roll -back an accept) to multiple queues so that all replicas end up with -the same message sequence while respecting atomicity. - -** Use of CPG - -CPG provides several benefits in the old cluster: -- tracking membership (essential for determining the primary) -- handling "spit brain" (integrates with partition support from CMAN) -- reliable multicast protocol to distribute messages. - -I believe we still need CPG for membership and split brain. We could -experiment with sending the bulk traffic over AMQP conections. - -** Flow control - -Need to ensure that -1) In-memory internal queues used by the cluster don't overflow. -2) The backups don't fall too far behind on processing CPG messages - -** Recovery -When a new backup joins an active cluster it must get a snapshot -from one of the other backups, or the primary if there are none. In -store terms this is "recovery" (old cluster called it an "update) - -Compared to old cluster we only replidate well defined data set of the store. -This is the crucial sore spot of old cluster. - -We can also replicated it more efficiently by recovering queues in -reverse (LIFO) order. That means as clients actively consume messages -from the front of the queue, they are redeucing the work we have to do -in recovering from the back. (NOTE: this may not be compatible with -using the same recovery interfaces as the store.) - -** Selective replication -In this model it's easy to support selective replication of individual queues via -configuration. -- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. - Treated analogously to persistent/durable properties for the store. -- if not explicitly marked, provide a choice of default - - default is replicate (replicated message on replicated queue) - - default is don't replicate - - default is replicate persistent/durable messages. - -** Inconsistent errors - -The new design eliminates most sources of inconsistent errors in the -old design (connections, sessions, security, management etc.) and -eliminates the need to stall the whole cluster till an error is -resolved. We still have to handle inconsistent store errors when store -and cluster are used together. - -We also have to include error handling in the async completion loop to -guarantee N-way at least once: we should only report success to the -client when we know the message was replicated and stored on all N-1 -backups. - -TODO: We have a lot more options than the old cluster, need to figure -out the best approach, or possibly allow mutliple approaches. Need to -go thru the various failure cases. We may be able to do recovery on a -per-queue basis rather than restarting an entire node. - -** New members joining - -We should be able to catch up much faster than the the old design. A -new backup can catch up ("recover") the current cluster state on a -per-queue basis. -- queues can be updated in parallel -- "live" updates avoid the the "endless chase" - -During a "live" update several things are happening on a queue: -- clients are publishing messages to the back of the queue, replicated to the backup -- clients are consuming messages from the front of the queue, replicated to the backup. -- the primary is sending pre-existing messages to the new backup. - -The primary sends pre-existing messages in LIFO order - starting from -the back of the queue, at the same time clients are consuming from the front. -The active consumers actually reduce the amount of work to be done, as there's -no need to replicate messages that are no longer on the queue. - -* Steps to get there - -** Baseline replication -Validate the overall design get initial notion of performance. Just -message+wiring replication, no update/recovery for new members joining, -single CPG dispatch thread on backups, no failover, no transactions. - -** Failover -Electing primary, backups redirect to primary. Measure failover time -for large # clients. Strategies to minimise number of retries after a -failure. - -** Flow Control -Keep internal queues from over-flowing. Similar to internal flow control in old cluster. -Needed for realistic performance/stress tests - -** Concurrency -Experiment with multiple threads on backups, multiple CPG groups. - -** Recovery/new member joining -Initial status handshake for new member. Recovering queues from the back. - -** Transactions -TODO: How to implement transactions with concurrency. Worst solution: -a global --cluster-use-transactions flag that forces single thread -mode. Need to find a better solution. diff --git a/qpid/cpp/design_docs/new-ha-design.txt b/qpid/cpp/design_docs/new-ha-design.txt new file mode 100644 index 0000000000..9b6d7d676c --- /dev/null +++ b/qpid/cpp/design_docs/new-ha-design.txt @@ -0,0 +1,353 @@ +-*-org-*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +* An active-passive, hot-standby design for Qpid clustering. + +For some background see [[./new-cluster-design.txt]] which describes the +issues with the old design and a new active-active design that could +replace it. + +This document describes an alternative active-passive approach based on +queue browsing to replicate message data. + +** Active-active vs. active-passive (hot-standby) + +An active-active cluster allows clients to connect to any broker in +the cluster. If a broker fails, clients can fail-over to any other +live broker. + +A hot-standby cluster has only one active broker at a time (the +"primary") and one or more brokers on standby (the "backups"). Clients +are only served by the primary, clients that connect to a backup are +redirected to the primary. The backups are kept up-to-date in real +time by the primary, if the primary fails a backup is elected to be +the new primary. + +The main problem with active-active is co-ordinating consumers of the +same queue on multiple brokers such that there are no duplicates in +normal operation. There are 2 approaches: + +Predictive: each broker predicts which messages others will take. This +the main weakness of the old design so not appealing. + +Locking: brokers "lock" a queue in order to take messages. This is +complex to implement and it is not straighforward to determine the +best strategy for passing the lock. In tests to date it results in +very high latencies (10x standalone broker). + +Hot-standby removes this problem. Only the primary can modify queues +so it just has to tell the backups what it is doing, there's no +locking. + +The primary can enqueue messages and replicate asynchronously - +exactly like the store does, but it "writes" to the replicas over the +network rather than writing to disk. + +** Replicating browsers + +The unit of replication is a replicating browser. This is an AMQP +consumer that browses a remote queue via a federation link and +maintains a local replica of the queue. As well as browsing the remote +messages as they are added the browser receives dequeue notifications +when they are dequeued remotely. + +On the primary broker incoming mesage transfers are completed only when +all of the replicating browsers have signaled completion. Thus a completed +message is guaranteed to be on the backups. + +** Failover and Cluster Resource Managers + +We want to delegate the failover management to an existing cluster +resource manager. Initially this is rgmanager from Cluster Suite, but +other managers (e.g. PaceMaker) could be supported in future. + +Rgmanager takes care of starting and stopping brokers and informing +brokers of their roles as primary or backup. It ensures there's +exactly one primary broker running at any time. It also tracks quorum +and protects against split-brain. + +Rgmanger can also manage a virtual IP address so clients can just +retry on a single address to fail over. Alternatively we will also +support configuring a fixed list of broker addresses when qpid is run +outside of a resource manager. + +Aside: Cold-standby is also possible using rgmanager with shared +storage for the message store (e.g. GFS). If the broker fails, another +broker is started on a different node and and recovers from the +store. This bears investigation but the store recovery times are +likely too long for failover. + +** Replicating wiring + +New queues and exchanges and their bindings also need to be replicated. +This is done by a QMF client that registers for wiring changes +on the remote broker and mirrors them in the local broker. + +** Use of CPG + +CPG is not required in this model, an external cluster resource +manager takes care of membership and quorum. + +** Selective replication + +In this model it's easy to support selective replication of individual queues via +configuration. +- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. + Treated analogously to persistent/durable properties for the store. +- if not explicitly marked, provide a choice of default + - default is replicate (replicated message on replicated queue) + - default is don't replicate + - default is replicate persistent/durable messages. + +[GRS: current prototype relies on queue sequence for message identity +so selectively replicating certain messages on a given queue would be +challenging. Selectively replicating certain queues however is trivial.] + +** Inconsistent errors + +The new design eliminates most sources of inconsistent errors in the +old design (connections, sessions, security, management etc.) and +eliminates the need to stall the whole cluster till an error is +resolved. We still have to handle inconsistent store errors when store +and cluster are used together. + +We also have to include error handling in the async completion loop to +guarantee N-way at least once: we should only report success to the +client when we know the message was replicated and stored on all N-1 +backups. + +TODO: We have a lot more options than the old cluster, need to figure +out the best approach, or possibly allow mutliple approaches. Need to +go thru the various failure cases. We may be able to do recovery on a +per-queue basis rather than restarting an entire node. + + +** New backups joining + +New brokers can join the cluster as backups. Note - if the new broker +has a new IP address, then the existing cluster members must be +updated with a new client and broker URLs by a sysadmin. + + +They discover + +We should be able to catch up much faster than the the old design. A +new backup can catch up ("recover") the current cluster state on a +per-queue basis. +- queues can be updated in parallel +- "live" updates avoid the the "endless chase" + +During a "live" update several things are happening on a queue: +- clients are publishing messages to the back of the queue, replicated to the backup +- clients are consuming messages from the front of the queue, replicated to the backup. +- the primary is sending pre-existing messages to the new backup. + +The primary sends pre-existing messages in LIFO order - starting from +the back of the queue, at the same time clients are consuming from the front. +The active consumers actually reduce the amount of work to be done, as there's +no need to replicate messages that are no longer on the queue. + +** Broker discovery and lifecycle. + +The cluster has a client URL that can contain a single virtual IP +address or a list of real IP addresses for the cluster. + +In backup mode, brokers reject connections except from a special +cluster-admin user. + +Clients discover the primary by re-trying connection to the client URL +until the successfully connect to the primary. In the case of a +virtual IP they re-try the same address until it is relocated to the +primary. In the case of a list of IPs the client tries each in +turn. Clients do multiple retries over a configured period of time +before giving up. + +Backup brokers discover the primary in the same way as clients. There +is a separate broker URL for brokers since they often will connect +over a different network to the clients. The broker URL has to be a +list of IPs rather than one virtual IP so broker members can connect +to each other. + +Brokers have the following states: +- connecting: backup broker trying to connect to primary - loops retrying broker URL. +- catchup: connected to primary, catching up on pre-existing wiring & messages. +- backup: fully functional backup. +- primary: Acting as primary, serving clients. + +** Interaction with rgmanager + +rgmanager interacts with qpid via 2 service scripts: backup & primary. These +scripts interact with the underlying qpidd service. + +*** Initial cluster start + +rgmanager starts the backup service on all nodes and the primary service on one node. + +On the backup nodes qpidd is in the connecting state. The primary node goes into +the primary state. Backups discover the primary, connect and catch up. + +*** Failover + +primary broker or node fails. Backup brokers see disconnect and go +back to connecting mode. + +rgmanager notices the failure and starts the primary service on a new node. +This tells qpidd to go to primary mode. Backups re-connect and catch up. + +*** Failback + +Cluster of N brokers has suffered a failure, only N-1 brokers +remain. We want to start a new broker (possibly on a new node) to +restore redundancy. + +If the new broker has a new IP address, the sysadmin pushes a new URL +to all the existing brokers. + +The new broker starts in connecting mode. It discovers the primary, +connects and catches up. + +*** Failure during failback + +A second failure occurs before the new backup B can complete its catch +up. The backup B refuses to become primary by failing the primary +start script if rgmanager chooses it, so rgmanager will try another +(hopefully caught-up) broker to be primary. + +** Interaction with the store. + +# FIXME aconway 2011-11-16: TBD +- how to identify the "best" store after a total cluster crash. +- best = last to be primary. +- primary "generation" - counter passed to backups and incremented by new primary. + +restart after crash: +- clean/dirty flag on disk for admin shutdown vs. crash +- dirty brokers refuse to be primary +- sysamdin tells best broker to be primary +- erase stores? delay loading? + +** Current Limitations + +(In no particular order at present) + +For message replication: + +LM1 - The re-synchronisation does not handle the case where a newly elected +master is *behind* one of the other backups. To address this I propose +a new event for restting the sequence that the new master would send +out on detecting that a replicating browser is ahead of it, requesting +that the replica revert back to a particular sequence number. The +replica on receiving this event would then discard (i.e. dequeue) all +the messages ahead of that sequence number and reset the counter to +correctly sequence any subsequently delivered messages. + +LM2 - There is a need to handle wrap-around of the message sequence to avoid +confusing the resynchronisation where a replica has been disconnected +for a long time, sufficient for the sequence numbering to wrap around. + +LM3 - Transactional changes to queue state are not replicated atomically. + +LM4 - Acknowledgements are confirmed to clients before the message has been +dequeued from replicas or indeed from the local store if that is +asynchronous. + +LM5 - During failover, messages (re)published to a queue before there are +the requisite number of replication subscriptions established will be +confirmed to the publisher before they are replicated, leaving them +vulnerable to a loss of the new master before they are replicated. + +For configuration propagation: + +LC1 - Bindings aren't propagated, only queues and exchanges. + +LC2 - Queue and exchange propagation is entirely asynchronous. There +are three cases to consider here for queue creation: (a) where queues +are created through the addressign syntax supported the messaging API, +they should be recreated if needed on failover and message replication +if required is dealt with seperately; (b) where queues are created +using configuration tools by an administrator or by a script they can +query the backups to verify the config has propagated and commands can +be re-run if there is a failure before that; (c) where applications +have more complex programs on which queues/exchanges are created using +QMF or directly via 0-10 APIs, the completion of the command will not +guarantee that the command has been carried out on other +nodes. I.e. case (a) doesn't require anything (apart from LM5 in some +cases), case (b) can be addressed in a simple manner through tooling +but case (c) would require changes to the broker to allow client to +simply determine when the command has fully propagated. + +LC3 - Queues that are not in the query response received when a +replica establishes a propagation subscription but exist locally are +not deleted. I.e. Deletion of queues/exchanges while a replica is not +connected will not be propagated. Solution is to delete any queues +marked for propagation that exist locally but do not show up in the +query response. + +LC4 - It is possible on failover that the new master did not +previously receive a given QMF event while a backup did (sort of an +analogous situation to LM1 but without an easy way to detect or remedy +it). + +LC5 - Need richer control over which queues/exchanges are propagated, and +which are not. + +LC6 - The events and query responses are not fully synchronized. + + In particular it *is* possible to not receive a delete event but + for the deleted object to still show up in the query response + (meaning the deletion is 'lost' to the update). + + It is also possible for an create event to be received as well + as the created object being in the query response. Likewise it + is possible to receive a delete event and a query response in + which the object no longer appears. In these cases the event is + essentially redundant. + + It is not possible to miss a create event and yet not to have + the object in question in the query response however. + +* User Documentation Notes + +Notes to seed initial user documentation. Loosely tracking the implementation, +some points mentioned in the doc may not be implemented yet. + +** High Availability Overview +Explain basic concepts: hot standby, primary/backup, replicated queue/exchange. +Network topology: backup links, corosync, separate client/cluster networks. +Describe failover mechanisms. +- Client view: URLs, failover, exclusion & discovery. +- Broker view: similar. +Role of rmganager & corosync. + +** Client view. +Clients use multi-address URL in base case. +Clients can't connect to backups, retry till they find primary. +Only qpid.cluster-admin can connect to backup, must not mess with replicated queues. +Note connection known-hosts returns client URL, as does amq.failover exchange. + +Creating replicated queues & exchanges: +- qpid.replicate argument, +- examples using addressing and qpid-config) + +** Configuring corosync +Must be on same network as backup links. + +** Configuring rgmanager + +** Configuring qpidd +HA related options. diff --git a/qpid/cpp/design_docs/replicating-browser-design.txt b/qpid/cpp/design_docs/replicating-browser-design.txt deleted file mode 100644 index e304258d35..0000000000 --- a/qpid/cpp/design_docs/replicating-browser-design.txt +++ /dev/null @@ -1,224 +0,0 @@ --*-org-*- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -* FIXME - rewrite all old stuff from hot-standby.txt. - -* Another new design for Qpid clustering. - -For some background see [[./new-cluster-design.txt]] which describes the -issues with the old design and a new active-active design that could -replace it. - -This document describes an alternative active-passive approach. - -** Active-active vs. active-passive (hot-standby) - -An active-active cluster allows clients to connect to any broker in -the cluster. If a broker fails, clients can fail-over to any other -live broker. - -A hot-standby cluster has only one active broker at a time (the -"primary") and one or more brokers on standby (the "backups"). Clients -are only served by the primary, clients that connect to a backup are -redirected to the primary. The backups are kept up-to-date in real -time by the primary, if the primary fails a backup is elected to be -the new primary. - -The main problem with active-active is co-ordinating consumers of the -same queue on multiple brokers such that there are no duplicates in -normal operation. There are 2 approaches: - -Predictive: each broker predicts which messages others will take. This -the main weakness of the old design so not appealing. - -Locking: brokers "lock" a queue in order to take messages. This is -complex to implement and it is not straighforward to determine the -best strategy for passing the lock. In tests to date it results in -very high latencies (10x standalone broker). - -Hot-standby removes this problem. Only the primary can modify queues -so it just has to tell the backups what it is doing, there's no -locking. - -The primary can enqueue messages and replicate asynchronously - -exactly like the store does, but it "writes" to the replicas over the -network rather than writing to disk. - -** Failover in a hot-standby cluster. - -We want to delegate the failover management to an existing cluster -resource manager. Initially this is rgmanager from Cluster Suite, but -other managers (e.g. PaceMaker) could be supported in future. - -Rgmanager takes care of starting and stopping brokers and informing -brokers of their roles as primary or backup. It ensures there's -exactly one broker running at any time. It also tracks quorum and -protects against split-brain. - -Rgmanger can also manage a virtual IP address so clients can just -retry on a single address to fail over. Alternatively we will support -configuring a fixed list of broker addresses when qpid is run outside -of a resource manager. - -Aside: Cold-standby is also possible using rgmanager with shared -storage for the message store (e.g. GFS). If the broker fails, another -broker is started on a different node and and recovers from the -store. This bears investigation but the store recovery times are -likely too long for failover. - -** Replicating browsers - -The unit of replication is a replicating browser. This is an AMQP -consumer that browses a remote queue via a federation link and -maintains a local replica of the queue. As well as browsing the remote -messages as they are added the browser receives dequeue notifications -when they are dequeued remotely. - -On the primary broker incoming mesage transfers are completed only when -all of the replicating browsers have signaled completion. Thus a completed -message is guaranteed to be on the backups. - -** Replicating wiring - -New queues and exchanges and their bindings also need to be replicated. -This is done by a QMF client that registers for wiring changes -on the remote broker and mirrors them in the local broker. - -** Use of CPG - -CPG is not required in this model, an external cluster resource -manager takes care of membership and quorum. - -** Selective replication -In this model it's easy to support selective replication of individual queues via -configuration. -- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. - Treated analogously to persistent/durable properties for the store. -- if not explicitly marked, provide a choice of default - - default is replicate (replicated message on replicated queue) - - default is don't replicate - - default is replicate persistent/durable messages. - -[GRS: current prototype relies on queue sequence for message identity -so selectively replicating certain messages on a given queue would be -challenging. Selectively replicating certain queues however is trivial.] - -** Inconsistent errors - -The new design eliminates most sources of inconsistent errors in the -old design (connections, sessions, security, management etc.) and -eliminates the need to stall the whole cluster till an error is -resolved. We still have to handle inconsistent store errors when store -and cluster are used together. - -We also have to include error handling in the async completion loop to -guarantee N-way at least once: we should only report success to the -client when we know the message was replicated and stored on all N-1 -backups. - -TODO: We have a lot more options than the old cluster, need to figure -out the best approach, or possibly allow mutliple approaches. Need to -go thru the various failure cases. We may be able to do recovery on a -per-queue basis rather than restarting an entire node. - -** New members joining - -We should be able to catch up much faster than the the old design. A -new backup can catch up ("recover") the current cluster state on a -per-queue basis. -- queues can be updated in parallel -- "live" updates avoid the the "endless chase" - -During a "live" update several things are happening on a queue: -- clients are publishing messages to the back of the queue, replicated to the backup -- clients are consuming messages from the front of the queue, replicated to the backup. -- the primary is sending pre-existing messages to the new backup. - -The primary sends pre-existing messages in LIFO order - starting from -the back of the queue, at the same time clients are consuming from the front. -The active consumers actually reduce the amount of work to be done, as there's -no need to replicate messages that are no longer on the queue. - -** Current Limitations - -(In no particular order at present) - -For message replication: - -LM1 - The re-synchronisation does not handle the case where a newly elected -master is *behind* one of the other backups. To address this I propose -a new event for restting the sequence that the new master would send -out on detecting that a replicating browser is ahead of it, requesting -that the replica revert back to a particular sequence number. The -replica on receiving this event would then discard (i.e. dequeue) all -the messages ahead of that sequence number and reset the counter to -correctly sequence any subsequently delivered messages. - -LM2 - There is a need to handle wrap-around of the message sequence to avoid -confusing the resynchronisation where a replica has been disconnected -for a long time, sufficient for the sequence numbering to wrap around. - -LM3 - Transactional changes to queue state are not replicated atomically. - -LM4 - Acknowledgements are confirmed to clients before the message has been -dequeued from replicas or indeed from the local store if that is -asynchronous. - -LM5 - During failover, messages (re)published to a queue before there are -the requisite number of replication subscriptions established will be -confirmed to the publisher before they are replicated, leaving them -vulnerable to a loss of the new master before they are replicated. - -For configuration propagation: - -LC1 - Bindings aren't propagated, only queues and exchanges. - -LC2 - Queue and exchange propagation is entirely asynchronous. There -are three cases to consider here for queue creation: (a) where queues -are created through the addressign syntax supported the messaging API, -they should be recreated if needed on failover and message replication -if required is dealt with seperately; (b) where queues are created -using configuration tools by an administrator or by a script they can -query the backups to verify the config has propagated and commands can -be re-run if there is a failure before that; (c) where applications -have more complex programs on which queues/exchanges are created using -QMF or directly via 0-10 APIs, the completion of the command will not -guarantee that the command has been carried out on other -nodes. I.e. case (a) doesn't require anything (apart from LM5 in some -cases), case (b) can be addressed in a simple manner through tooling -but case (c) would require changes to the broker to allow client to -simply determine when the command has fully propagated. - -LC3 - Queues that are not in the query response received when a -replica establishes a propagation subscription but exist locally are -not deleted. I.e. Deletion of queues/exchanges while a replica is not -connected will not be propagated. Solution is to delete any queues -marked for propagation that exist locally but do not show up in the -query response. - -LC4 - It is possible on failover that the new master did not -previously receive a given QMF event while a backup did (sort of an -analogous situation to LM1 but without an easy way to detect or remedy -it). - -LC5 - Need richer control over which queues/exchanges are propagated, and -which are not. - -Question: is it possible to miss an event on subscribing for -configuration propagation? are the initial snapshot and subsequent -events correctly synchronised? diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 59e951fb6e..b8a1d26fb0 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1498,6 +1498,10 @@ class SchemaClass: def genNamePackageLower (self, stream, variables): stream.write (self.packageName.lower ()) + def genPackageNameUpper (self, stream, variables): + up = "_".join(self.packageName.split(".")) + stream.write (up.upper()) + def genNameUpper (self, stream, variables): stream.write (self.name.upper ()) @@ -1642,6 +1646,7 @@ class SchemaPackage: def genNamespace (self, stream, variables): stream.write("::".join(self.packageName.split("."))) + def genOpenNamespaces (self, stream, variables): for item in self.packageName.split("."): stream.write ("namespace %s {\n" % item) diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 4bcd423a26..90f1b4dd4a 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -1,6 +1,6 @@ /*MGEN:commentPrefix=//*/ -#ifndef _MANAGEMENT_/*MGEN:Class.NameUpper*/_ -#define _MANAGEMENT_/*MGEN:Class.NameUpper*/_ +#ifndef _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_ +#define _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_ // // Licensed to the Apache Software Foundation (ASF) under one diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8fde735380..c525f8b24b 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -104,7 +104,8 @@ mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk \ -c $(srcdir)/managementgen.cmake -q -b -o qmf \ $(top_srcdir)/../specs/management-schema.xml \ $(srcdir)/qpid/acl/management-schema.xml \ - $(srcdir)/qpid/cluster/management-schema.xml + $(srcdir)/qpid/cluster/management-schema.xml \ + $(srcdir)/qpid/ha/management-schema.xml $(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp mgen.timestamp: $(mgen_generator) @@ -209,6 +210,7 @@ dmoduleexec_LTLIBRARIES = cmoduleexec_LTLIBRARIES = include cluster.mk +include ha.mk include acl.mk include qmf.mk include qmfc.mk @@ -533,6 +535,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionToken.h \ qpid/broker/Consumer.h \ + qpid/broker/ConsumerFactory.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ @@ -593,8 +596,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ - qpid/broker/NodeClone.h \ - qpid/broker/NodeClone.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ @@ -622,8 +623,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ - qpid/broker/QueueReplicator.h \ - qpid/broker/QueueReplicator.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk new file mode 100644 index 0000000000..dc4e7c8d0a --- /dev/null +++ b/qpid/cpp/src/ha.mk @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# +# HA plugin makefile fragment, to be included in Makefile.am +# + +dmoduleexec_LTLIBRARIES += ha.la + +ha_la_SOURCES = \ + qpid/ha/Backup.cpp \ + qpid/ha/Backup.h \ + qpid/ha/HaBroker.cpp \ + qpid/ha/HaBroker.h \ + qpid/ha/HaPlugin.cpp \ + qpid/ha/Logging.h \ + qpid/ha/Logging.cpp \ + qpid/ha/Settings.h \ + qpid/ha/QueueReplicator.h \ + qpid/ha/QueueReplicator.cpp \ + qpid/ha/ReplicatingSubscription.h \ + qpid/ha/ReplicatingSubscription.cpp \ + qpid/ha/WiringReplicator.cpp \ + qpid/ha/WiringReplicator.h + +ha_la_LIBADD = libqpidbroker.la +ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index b00cdd00dc..405482da5d 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,8 +24,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" -#include "qpid/broker/NodeClone.h" -#include "qpid/broker/QueueReplicator.h" +#include "qpid/ha/WiringReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" @@ -59,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) } Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args) : + const _qmf::ArgsLinkBridge& _args, + InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0), + initialize(init) { std::stringstream title; title << id << "_" << link->getBroker()->getFederationTag(); @@ -77,9 +78,9 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } -Bridge::~Bridge() +Bridge::~Bridge() { - mgmtObject->resourceDestroy(); + mgmtObject->resourceDestroy(); } void Bridge::create(Connection& c) @@ -98,7 +99,7 @@ void Bridge::create(Connection& c) session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - + session->attach(name, false); session->commandPoint(0,0); } else { @@ -108,60 +109,12 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (args.i_srcIsQueue) { - //TODO: something other than this which is nasty... - bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options); - - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options); + if (initialize) initialize(*this, sessionHandler); + else if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); - } else if (NodeClone::isNodeCloneDestination(args.i_dest)) { - //declare and bind an event queue - peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); - //subscribe to the queue - peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions - Variant::Map request; - request["_what"] = "OBJECT"; - Variant::Map schema; - schema["_class_name"] = (i == 0 ? "queue" : "exchange"); - schema["_package_name"] = "org.apache.qpid.broker"; - request["_schema_id"] = schema; - - AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId("qmf2"); - props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); - headerBody.get(true)->setRoutingKey("broker"); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); - } - } else { FieldTable queueSettings; @@ -236,11 +189,6 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } -const string& Bridge::getName() const -{ - return name; -} - Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { string host; @@ -268,7 +216,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) is_queue, is_local, id, excludes, dynamic, sync).first; } -void Bridge::encode(Buffer& buffer) const +void Bridge::encode(Buffer& buffer) const { buffer.putShortString(string("bridge")); buffer.putShortString(link->getHost()); @@ -285,8 +233,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShort(args.i_sync); } -uint32_t Bridge::encodedSize() const -{ +uint32_t Bridge::encodedSize() const +{ return link->getHost().size() + 1 // short-string (host) + 7 // short-string ("bridge") + 2 // port @@ -311,7 +259,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/, string&) { - if (methodId == _qmf::Bridge::METHOD_CLOSE) { + if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed destroy(); return management::Manageable::STATUS_OK; @@ -358,7 +306,7 @@ void Bridge::sendReorigin() conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, args.i_key, bindArgs)); } -bool Bridge::resetProxy() +bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(id); if (!sessionHandler.getSession()) peer.reset(); diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 8b4559a871..b849b11ba8 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,15 +42,19 @@ class Connection; class ConnectionState; class Link; class LinkRegistry; +class SessionHandler; class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge { public: typedef boost::shared_ptr shared_ptr; typedef boost::function CancellationListener; + typedef boost::function InitializeCallback; Bridge(Link* link, framing::ChannelId id, CancellationListener l, - const qmf::org::apache::qpid::broker::ArgsLinkBridge& args); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, + InitializeCallback init + ); ~Bridge(); void create(Connection& c); @@ -70,8 +74,8 @@ public: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; - const std::string& getName() const; + void encode(framing::Buffer& buffer) const; + const std::string& getName() const { return name; } static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); // Exchange::DynamicBridge methods @@ -81,6 +85,10 @@ public: bool containsLocalTag(const std::string& tagList) const; const std::string& getLocalTag() const; + // Methods needed by initialization functions + std::string getQueueName() const { return queueName; } + const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } + private: struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } @@ -103,6 +111,7 @@ private: mutable uint64_t persistenceId; ConnectionState* connState; Connection* conn; + InitializeCallback initialize; bool resetProxy(); }; diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index b3b751be98..e8ada49c62 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -37,6 +37,7 @@ #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/ConsumerFactory.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -199,6 +200,7 @@ public: bool inCluster, clusterUpdatee; boost::intrusive_ptr expiryPolicy; ConnectionCounter connectionCounter; + ConsumerFactories consumerFactories; public: virtual ~Broker(); @@ -357,6 +359,8 @@ public: const std::string& key, const std::string& userId, const std::string& connectionId); + + ConsumerFactories& getConsumerFactories() { return consumerFactories; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 8451f35cb0..0e20719252 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boost::function0 callback) { ScopedLock l(ioCallbackLock); ioCallbacks.push(callback); - out.activateOutput(); + if (isOpen()) out.activateOutput(); } Connection::~Connection() @@ -156,11 +156,14 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); + bool wasOpen = isOpen(); adapter.handle(frame); if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); + if (!wasOpen && isOpen()) + doIoCallbacks(); // Do any callbacks registered before we opened. } void Connection::sent(const framing::AMQFrame& frame) @@ -329,17 +332,16 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } void Connection::doIoCallbacks() { - { - ScopedLock l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. - ClusterUnsafeScope cus; - while (!ioCallbacks.empty()) { - boost::function0 cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } + if (!isOpen()) return; // Don't process IO callbacks until we are open. + ScopedLock l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; + while (!ioCallbacks.empty()) { + boost::function0 cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing } } diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index a3838fb6f0..7ac7dba56f 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -31,6 +31,9 @@ namespace broker { class Queue; class QueueListeners; +/** + * Base class for consumers which represent a subscription to a queue. + */ class Consumer { const bool acquires; const bool browseAcquired; diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h new file mode 100644 index 0000000000..abd39fb3f8 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h @@ -0,0 +1,70 @@ +#ifndef QPID_BROKER_CONSUMERFACTORY_H +#define QPID_BROKER_CONSUMERFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public. +// Refactor to use a more abstract interface. + +#include "qpid/broker/SemanticState.h" + +namespace qpid { +namespace broker { + +/** + * Base class for consumer factoires. Plugins can register a + * ConsumerFactory via Broker:: getConsumerFactories() Each time a + * conumer is created, each factory is tried in turn till one returns + * non-0. + */ +class ConsumerFactory +{ + public: + virtual ~ConsumerFactory() {} + + virtual boost::shared_ptr create( + SemanticState* parent, + const std::string& name, boost::shared_ptr queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0; +}; + +/** A set of factories held by the broker + * THREAD UNSAFE: see notes on member functions. + */ +class ConsumerFactories { + public: + typedef std::vector > Factories; + + /** Thread safety: May only be called during plug-in initialization. */ + void add(const boost::shared_ptr& cf) { factories.push_back(cf); } + + /** Thread safety: May only be called after plug-in initialization. */ + const Factories& get() const { return factories; } + + private: + Factories factories; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 43ca1ae04b..4fe76dabd8 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, bool _acquired, bool accepted, bool _windowing, - uint32_t _credit, bool _delayedCompletion) : msg(_msg), + uint32_t _credit, bool _isDelayedCompletion) : msg(_msg), queue(_queue), tag(_tag), acquired(_acquired), @@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, ended(accepted && acquired), windowing(_windowing), credit(msg.payload ? msg.payload->getRequiredCredit() : _credit), - delayedCompletion(_delayedCompletion) + isDelayedCompletion(_isDelayedCompletion) {} bool DeliveryRecord::setEnded() @@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) { if (!ended) { if (acquired) { queue->dequeue(ctxt, msg); - } else if (delayedCompletion) { + } else if (isDelayedCompletion) { //TODO: this is a nasty way to do this; change it msg.payload->getIngressCompletion().finishCompleter(); QPID_LOG(debug, "Completed " << msg.payload.get()); diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 90e72aaf0d..ea33ed5461 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -63,7 +63,7 @@ class DeliveryRecord * after that). */ uint32_t credit; - bool delayedCompletion; + bool isDelayedCompletion; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, @@ -73,7 +73,7 @@ class DeliveryRecord bool accepted, bool windowing, uint32_t credit=0, // Only used if msg is empty. - bool delayedCompletion=false + bool isDelayedCompletion=false ); bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 8010bf43e7..f92d543c2d 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -47,13 +47,13 @@ namespace _qmf = qmf::org::apache::qpid::broker; Link::Link(LinkRegistry* _links, MessageStore* _store, - string& _host, + const string& _host, uint16_t _port, - string& _transport, + const string& _transport, bool _durable, - string& _authMechanism, - string& _username, - string& _password, + const string& _authMechanism, + const string& _username, + const string& _password, Broker* _broker, Manageable* parent) : links(_links), store(_store), host(_host), port(_port), @@ -79,6 +79,7 @@ Link::Link(LinkRegistry* _links, } } setStateLH(STATE_WAITING); + startConnectionLH(); } Link::~Link () @@ -213,28 +214,30 @@ void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); created.push_back (bridge); + if (connection) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } void Link::cancel(Bridge::shared_ptr bridge) { - { - Mutex::ScopedLock mutex(lock); + Mutex::ScopedLock mutex(lock); - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; - } + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - cancellations.push_back(bridge); - bridge->closed(); - active.erase(i); - break; - } + } + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; } } + if (!cancellations.empty()) { connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -282,6 +285,8 @@ void Link::setConnection(Connection* c) Mutex::ScopedLock mutex(lock); connection = c; updateUrls = true; + // Process any IO tasks bridges added before setConnection. + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::maintenanceVisit () @@ -311,7 +316,7 @@ void Link::maintenanceVisit () } else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); -} + } void Link::reconnect(const qpid::Address& a) { diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 4badd8b3a1..acaa9d1cbd 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -92,19 +92,21 @@ namespace qpid { Link(LinkRegistry* links, MessageStore* store, - std::string& host, + const std::string& host, uint16_t port, - std::string& transport, + const std::string& transport, bool durable, - std::string& authMechanism, - std::string& username, - std::string& password, + const std::string& authMechanism, + const std::string& username, + const std::string& password, Broker* broker, management::Manageable* parent = 0); virtual ~Link(); std::string getHost() { return host; } uint16_t getPort() { return port; } + std::string getTransport() { return transport; } + bool isDurable() { return durable; } void maintenanceVisit (); uint nextChannel(); diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index e9885f5462..31b4f1b490 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -124,13 +124,13 @@ bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& } } -pair LinkRegistry::declare(string& host, +pair LinkRegistry::declare(const string& host, uint16_t port, - string& transport, + const string& transport, bool durable, - string& authMechanism, - string& username, - string& password) + const string& authMechanism, + const string& username, + const string& password) { Mutex::ScopedLock locker(lock); @@ -151,18 +151,20 @@ pair LinkRegistry::declare(string& host, return std::pair(i->second, false); } -pair LinkRegistry::declare(std::string& host, +pair LinkRegistry::declare(const std::string& host, uint16_t port, bool durable, - std::string& src, - std::string& dest, - std::string& key, + const std::string& src, + const std::string& dest, + const std::string& key, bool isQueue, bool isLocal, - std::string& tag, - std::string& excludes, + const std::string& tag, + const std::string& excludes, bool dynamic, - uint16_t sync) + uint16_t sync, + Bridge::InitializeCallback init +) { Mutex::ScopedLock locker(lock); QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); @@ -196,7 +198,8 @@ pair LinkRegistry::declare(std::string& host, bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), args)); + host, port, src, dest, key), + args, init)); bridges[bridgeKey] = bridge; l->second->add(bridge); return std::pair(bridge, true); diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 4c97e4f9d8..7e5b39f223 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -84,28 +84,32 @@ namespace broker { ~LinkRegistry(); std::pair, bool> - declare(std::string& host, + declare(const std::string& host, uint16_t port, - std::string& transport, + const std::string& transport, bool durable, - std::string& authMechanism, - std::string& username, - std::string& password); + const std::string& authMechanism, + const std::string& username, + const std::string& password); + std::pair - declare(std::string& host, + declare(const std::string& host, uint16_t port, bool durable, - std::string& src, - std::string& dest, - std::string& key, + const std::string& src, + const std::string& dest, + const std::string& key, bool isQueue, bool isLocal, - std::string& id, - std::string& excludes, + const std::string& id, + const std::string& excludes, bool dynamic, - uint16_t sync); + uint16_t sync, + Bridge::InitializeCallback=0 + ); void destroy(const std::string& host, const uint16_t port); + void destroy(const std::string& host, const uint16_t port, const std::string& src, diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp deleted file mode 100644 index e8fc227884..0000000000 --- a/qpid/cpp/src/qpid/broker/NodeClone.cpp +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "NodeClone.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/log/Statement.h" -#include "qpid/amqp_0_10/Codecs.h" -#include "qpid/framing/reply_exceptions.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" - -using qmf::org::apache::qpid::broker::EventQueueDeclare; -using qmf::org::apache::qpid::broker::EventQueueDelete; -using qmf::org::apache::qpid::broker::EventExchangeDeclare; -using qmf::org::apache::qpid::broker::EventExchangeDelete; - -namespace qpid { -namespace broker { - -namespace{ -bool isQMFv2(const Message& message) -{ - const qpid::framing::MessageProperties* props = message.getProperties(); - return props && props->getAppId() == "qmf2"; -} - -template bool match(qpid::types::Variant::Map& schema) -{ - return T::match(schema["_class_name"], schema["_package_name"]); -} - -} - -NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {} - -NodeClone::~NodeClone() {} - -void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers) -{ - if (isQMFv2(msg.getMessage()) && headers) { - if (headers->getAsString("qmf.content") == "_event") { - //decode as list - std::string content = msg.getMessage().getFrames().getContent(); - qpid::types::Variant::List list; - qpid::amqp_0_10::ListCodec::decode(content, list); - if (list.empty()) { - QPID_LOG(error, "Error parsing QMF event, expected non-empty list"); - } else { - try { - qpid::types::Variant::Map& map = list.front().asMap(); - qpid::types::Variant::Map& schema = map["_schema_id"].asMap(); - qpid::types::Variant::Map& values = map["_values"].asMap(); - if (match(schema)) { - if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) { - qpid::framing::FieldTable args; - qpid::amqp_0_10::translate(values["args"].asMap(), args); - if (!broker.createQueue( - values["qName"].asString(), - values["durable"].asBool(), - values["autoDel"].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - values["altEx"].asString(), - args, - values["user"].asString(), - values["rhost"].asString()).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); - } - } - } else if (match(schema)) { - std::string name = values["qName"].asString(); - QPID_LOG(debug, "Notified of deletion of queue " << name); - boost::shared_ptr queue = broker.getQueues().find(name); - if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) { - broker.deleteQueue( - name, - values["user"].asString(), - values["rhost"].asString()); - } else { - if (queue) { - QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name); - } else { - QPID_LOG(debug, "No such queue " << name); - } - } - } else if (match(schema)) { - if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) { - qpid::framing::FieldTable args; - qpid::amqp_0_10::translate(values["args"].asMap(), args); - if (!broker.createExchange( - values["exName"].asString(), - values["exType"].asString(), - values["durable"].asBool(), - values["altEx"].asString(), - args, - values["user"].asString(), - values["rhost"].asString()).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); - } - } - } else if (match(schema)) { - std::string name = values["exName"].asString(); - QPID_LOG(debug, "Notified of deletion of exchange " << name); - try { - boost::shared_ptr exchange = broker.getExchanges().get(name); - if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) { - broker.deleteExchange( - name, - values["user"].asString(), - values["rhost"].asString()); - } else { - if (exchange) { - QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name); - } else { - QPID_LOG(debug, "No such exchange " << name); - } - } - } catch (const qpid::framing::NotFoundException&) {} - } - } catch (const std::exception& e) { - QPID_LOG(error, "Error propagating configuration: " << e.what()); - } - } - } else if (headers->getAsString("qmf.opcode") == "_query_response") { - //decode as list - std::string content = msg.getMessage().getFrames().getContent(); - qpid::types::Variant::List list; - qpid::amqp_0_10::ListCodec::decode(content, list); - QPID_LOG(debug, "Got query response (" << list.size() << ")"); - for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - std::string type = i->asMap()["_schema_id"].asMap()["_class_name"]; - qpid::types::Variant::Map& values = i->asMap()["_values"].asMap(); - QPID_LOG(debug, "class: " << type << ", values: " << values); - if (values["arguments"].asMap()["qpid.propagate"]) { - qpid::framing::FieldTable args; - qpid::amqp_0_10::translate(values["arguments"].asMap(), args); - if (type == "queue") { - if (!broker.createQueue( - values["name"].asString(), - values["durable"].asBool(), - values["autoDelete"].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists"); - } - } else if (type == "exchange") { - if (!broker.createExchange( - values["name"].asString(), - values["type"].asString(), - values["durable"].asBool(), - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists"); - } - } else { - QPID_LOG(warning, "Ignoring unknow object class: " << type); - } - } - } - } else { - QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers); - } - } else { - QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response"); - } -} - -bool NodeClone::isNodeCloneDestination(const std::string& target) -{ - return target == "qpid.node-cloner"; -} - -boost::shared_ptr NodeClone::create(const std::string& target, Broker& broker) -{ - boost::shared_ptr exchange; - if (isNodeCloneDestination(target)) { - //TODO: need to cache the exchange - QPID_LOG(info, "Creating node cloner"); - exchange.reset(new NodeClone(target, broker)); - } - return exchange; -} - -bool NodeClone::bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool NodeClone::unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool NodeClone::isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const) { return false; } - -const std::string NodeClone::typeName("node-cloner"); - -std::string NodeClone::getType() const -{ - return typeName; -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/NodeClone.h b/qpid/cpp/src/qpid/broker/NodeClone.h deleted file mode 100644 index 71cac619ad..0000000000 --- a/qpid/cpp/src/qpid/broker/NodeClone.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef QPID_BROKER_NODEPROPAGATOR_H -#define QPID_BROKER_NODEPROPAGATOR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/Exchange.h" - -namespace qpid { -namespace broker { - -class Broker; - -/** - * Pseudo-exchange for recreating local queues and/or exchanges on - * receipt of QMF events indicating their creation on another node - */ -class NodeClone : public Exchange -{ - public: - NodeClone(const std::string&, Broker&); - ~NodeClone(); - std::string getType() const; - bool bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*); - bool unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*); - void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*); - bool isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const); - - static bool isNodeCloneDestination(const std::string&); - static boost::shared_ptr create(const std::string&, Broker&); - static const std::string typeName; - private: - Broker& broker; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_NODEPROPAGATOR_H*/ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c47ba554d2..0e85fe1072 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1378,7 +1378,7 @@ bool Queue::bind(boost::shared_ptr exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index b66600ef43..a53916ffbc 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -403,7 +403,7 @@ class Queue : public boost::enable_shared_from_this, void flush(); - const Broker* getBroker(); + Broker* getBroker(); uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp deleted file mode 100644 index 01c0c8e272..0000000000 --- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/QueueReplicator.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/framing/SequenceSet.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace broker { - -QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {} -QueueReplicator::~QueueReplicator() {} - -namespace { -const std::string DEQUEUE_EVENT("dequeue-event"); -const std::string REPLICATOR("qpid.replicator-"); -} - -void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) -{ - if (key == DEQUEUE_EVENT) { - std::string content; - msg.getMessage().getFrames().getContent(content); - qpid::framing::Buffer buffer(const_cast(content.c_str()), content.size()); - qpid::framing::SequenceSet latest; - latest.decode(buffer); - - //TODO: should be able to optimise the following - for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) { - if (current < *i) { - //haven't got that far yet, record the dequeue - dequeued.add(*i); - QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName()); - } else { - QueuedMessage message; - if (queue->acquireMessageAt(*i, message)) { - queue->dequeue(0, message); - QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName()); - } else { - QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName()); - } - } - } - } else { - //take account of any gaps in sequence created by messages - //dequeued before our subscription reached them - while (dequeued.contains(++current)) { - dequeued.remove(current); - QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName()); - queue->setPosition(current); - } - QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current); - msg.deliverTo(queue); - } -} - -bool QueueReplicator::isReplicatingLink(const std::string& name) -{ - return name.find(REPLICATOR) == 0; -} - -boost::shared_ptr QueueReplicator::create(const std::string& target, QueueRegistry& queues) -{ - boost::shared_ptr exchange; - if (isReplicatingLink(target)) { - std::string queueName = target.substr(REPLICATOR.size()); - boost::shared_ptr queue = queues.find(queueName); - if (!queue) { - QPID_LOG(warning, "Unable to create replicator, can't find " << queueName); - } else { - //TODO: need to cache the replicator - QPID_LOG(info, "Creating replicator for " << queueName); - exchange.reset(new QueueReplicator(target, queue)); - } - } - return exchange; -} - -bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings) -{ - if (isReplicatingLink(target)) { - std::string queueName = target.substr(REPLICATOR.size()); - boost::shared_ptr queue = queues.find(queueName); - if (queue) { - settings.setInt("qpid.replicating-subscription", 1); - settings.setInt("qpid.high_sequence_number", queue->getPosition()); - qpid::framing::SequenceNumber oldest; - if (queue->getOldest(oldest)) { - settings.setInt("qpid.low_sequence_number", oldest); - } - } - return true; - } else { - return false; - } -} - -bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } -bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const) { return false; } - -const std::string QueueReplicator::typeName("queue-replicator"); - -std::string QueueReplicator::getType() const -{ - return typeName; -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h deleted file mode 100644 index 679aa9240d..0000000000 --- a/qpid/cpp/src/qpid/broker/QueueReplicator.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QPID_BROKER_QUEUEREPLICATOR_H -#define QPID_BROKER_QUEUEREPLICATOR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/Exchange.h" -#include "qpid/framing/SequenceSet.h" - -namespace qpid { -namespace broker { - -class QueueRegistry; - -/** - * Dummy exchange for processing replication messages - */ -class QueueReplicator : public Exchange -{ - public: - QueueReplicator(const std::string& name, boost::shared_ptr); - ~QueueReplicator(); - std::string getType() const; - bool bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*); - bool unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*); - void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*); - bool isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const); - static bool isReplicatingLink(const std::string&); - static boost::shared_ptr create(const std::string&, QueueRegistry&); - static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&); - static const std::string typeName; - private: - boost::shared_ptr queue; - qpid::framing::SequenceNumber current; - qpid::framing::SequenceSet dequeued; -}; - -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 7d10322163..43b5d997d1 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -25,9 +25,7 @@ #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" -#include "qpid/broker/NodeClone.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -110,15 +108,25 @@ bool SemanticState::exists(const string& consumerTag){ namespace { const std::string SEPARATOR("::"); } - + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, - bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) + bool exclusive, const string& resumeId, uint64_t resumeTtl, + const FieldTable& arguments) { // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). // Create a globally unique name so the broker can identify individual consumers std::string name = session.getSessionId().str() + SEPARATOR + tag; - ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + const ConsumerFactories::Factories& cf( + session.getBroker().getConsumerFactories().get()); + ConsumerImpl::shared_ptr c; + for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i) + c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments); + if (!c) // Create plain consumer + c = ConsumerImpl::shared_ptr( + new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -268,224 +276,6 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver -{ - public: - ReplicatingSubscription(SemanticState* parent, - const std::string& name, boost::shared_ptr queue, - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ReplicatingSubscription(); - - void init(); - void cancel(); - bool deliver(QueuedMessage& msg); - void enqueued(const QueuedMessage&); - void dequeued(const QueuedMessage&); - void acquired(const QueuedMessage&) {} - void requeued(const QueuedMessage&) {} - - protected: - bool doDispatch(); - private: - boost::shared_ptr events; - boost::shared_ptr consumer; - qpid::framing::SequenceSet range; - - void generateDequeueEvent(); - class DelegatingConsumer : public Consumer - { - public: - DelegatingConsumer(ReplicatingSubscription&); - ~DelegatingConsumer(); - bool deliver(QueuedMessage& msg); - void notify(); - //bool filter(boost::intrusive_ptr); - //bool accept(boost::intrusive_ptr); - Consumer::Action accept(const QueuedMessage&); - OwnershipToken* getSession(); - private: - ReplicatingSubscription& delegate; - }; -}; - -SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent, - const string& name, - Queue::shared_ptr queue, - bool ack, - bool acquire, - bool exclusive, - const string& tag, - const string& resumeId, - uint64_t resumeTtl, - const framing::FieldTable& arguments) -{ - if (arguments.isSet("qpid.replicating-subscription")) { - shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - boost::dynamic_pointer_cast(result)->init(); - return result; - } else { - return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - } -} - -std::string mask(const std::string& in) -{ - return std::string("$") + in + std::string("_internal"); -} - -class ReplicationStateInitialiser -{ - public: - ReplicationStateInitialiser(qpid::framing::SequenceSet& results, - const qpid::framing::SequenceNumber& start, - const qpid::framing::SequenceNumber& end); - void operator()(const QueuedMessage& m) { process(m); } - private: - qpid::framing::SequenceSet& results; - const qpid::framing::SequenceNumber start; - const qpid::framing::SequenceNumber end; - void process(const QueuedMessage&); -}; - -ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _acquire, - bool _exclusive, - const string& _tag, - const string& _resumeId, - uint64_t _resumeTtl, - const framing::FieldTable& _arguments -) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments), - events(new Queue(mask(_name))), - consumer(new DelegatingConsumer(*this)) -{ - - if (_arguments.isSet("qpid.high_sequence_number")) { - qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number"); - qpid::framing::SequenceNumber lwm; - if (_arguments.isSet("qpid.low_sequence_number")) { - lwm = _arguments.getAsInt("qpid.low_sequence_number"); - } else { - lwm = hwm; - } - qpid::framing::SequenceNumber oldest; - if (_queue->getOldest(oldest)) { - if (oldest >= hwm) { - range.add(lwm, --oldest); - } else if (oldest >= lwm) { - ReplicationStateInitialiser initialiser(range, lwm, hwm); - _queue->eachMessage(initialiser); - } else { //i.e. have older message on master than is reported to exist on replica - QPID_LOG(warning, "Replica appears to be missing message on master"); - } - } else { - //local queue (i.e. master) is empty - range.add(lwm, _queue->getPosition()); - } - QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range - << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")"); - //set position of 'cursor' - position = hwm; - } -} - -bool ReplicatingSubscription::deliver(QueuedMessage& m) -{ - return ConsumerImpl::deliver(m); -} - -void ReplicatingSubscription::init() -{ - getQueue()->addObserver(boost::dynamic_pointer_cast(shared_from_this())); -} - -void ReplicatingSubscription::cancel() -{ - getQueue()->removeObserver(boost::dynamic_pointer_cast(shared_from_this())); -} - -ReplicatingSubscription::~ReplicatingSubscription() {} - -//called before we get notified of the message being available and -//under the message lock in the queue -void ReplicatingSubscription::enqueued(const QueuedMessage& m) -{ - QPID_LOG(debug, "Enqueued message at " << m.position); - //delay completion - m.payload->getIngressCompletion().startCompleter(); - QPID_LOG(debug, "Delayed " << m.payload.get()); -} - -class Buffer : public qpid::framing::Buffer -{ - public: - Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {} - ~Buffer() { delete[] getPointer(); } -}; - -void ReplicatingSubscription::generateDequeueEvent() -{ - Buffer buffer(range.encodedSize()); - range.encode(buffer); - range.clear(); - buffer.reset(); - - //generate event message - boost::intrusive_ptr event = new Message(); - AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - content.castBody()->decode(buffer, buffer.getSize()); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - event->getFrames().append(method); - event->getFrames().append(header); - event->getFrames().append(content); - - DeliveryProperties* props = event->getFrames().getHeaders()->get(true); - props->setRoutingKey("dequeue-event"); - - events->deliver(event); -} - -//called after the message has been removed from the deque and under -//the message lock in the queue -void ReplicatingSubscription::dequeued(const QueuedMessage& m) -{ - { - Mutex::ScopedLock l(lock); - range.add(m.position); - QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position); - } - notify(); - if (m.position > position) { - m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue"); - } -} - -bool ReplicatingSubscription::doDispatch() -{ - { - Mutex::ScopedLock l(lock); - if (!range.empty()) { - generateDequeueEvent(); - } - } - bool r1 = events->dispatch(consumer); - bool r2 = ConsumerImpl::doDispatch(); - return r1 || r2; -} - SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -497,7 +287,6 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, uint64_t _resumeTtl, const framing::FieldTable& _arguments - ) : Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */ parent(_parent), @@ -512,7 +301,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -558,7 +347,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, dynamic_cast(this)); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, isDelayedCompletion()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); @@ -714,10 +503,10 @@ void SemanticState::route(intrusive_ptr msg, Deliverable& strategy) { msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { - cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); - if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker()); - if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); + if (!cacheExchange || cacheExchange->getName() != exchangeName + || cacheExchange->isDestroyed()) + { + cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); @@ -1099,36 +888,4 @@ void SemanticState::detached() } } -ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} -ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) -{ - return delegate.deliver(m); -} -void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } -//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } -Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); } -OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - -ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r, - const qpid::framing::SequenceNumber& s, - const qpid::framing::SequenceNumber& e) - : results(r), start(s), end(e) -{ - results.add(start, end); -} - -void ReplicationStateInitialiser::process(const QueuedMessage& message) -{ - if (message.position < start) { - //replica does not have a message that should still be on the queue - QPID_LOG(warning, "Replica appears to be missing message at " << message.position); - } else if (message.position >= start && message.position <= end) { - //i.e. message is within the intial range and has not been dequeued, so remove it from the results - results.remove(message.position); - } //else message has not been seen by replica yet so can be ignored here - -} - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 62c829e8f8..e467ec650f 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -157,11 +157,10 @@ class SemanticState : private boost::noncopyable { management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - static shared_ptr create(SemanticState* parent, - const std::string& name, boost::shared_ptr queue, - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - + /** This consumer wants delayed completion. + * Overridden by ConsumerImpl subclasses. + */ + virtual bool isDelayedCompletion() const { return false; } }; typedef std::map DtxBufferMap; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index ca6d6bb193..8cd5072574 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -23,6 +23,7 @@ */ #include "qpid/amqp_0_10/SessionHandler.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index fe66b77238..f56697c845 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -208,6 +208,8 @@ class Connection : void queueDequeueSincePurgeState(const std::string&, uint32_t); + bool isAnnounced() const { return announced; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 4bf03eefa2..2cd1cf9a83 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) { } void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { - if (parent.isLocal() && !sentDoOutput && !closing) { + if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) { sentDoOutput = true; parent.getCluster().getMulticast().mcastControl( ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp new file mode 100644 index 0000000000..17da13ed1e --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "Settings.h" +#include "WiringReplicator.h" +#include "ReplicatingSubscription.h" +#include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace ha { + +using namespace framing; +using namespace broker; +using types::Variant; +using std::string; + +Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { + // FIXME aconway 2011-11-24: identifying the primary. + if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary. + Url url(s.brokerUrl); + QPID_LOG(info, "HA: Acting as backup"); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + + // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. + // Declare the link + std::pair result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + s.mechanism, s.username, s.password); + assert(result.second); // FIXME aconway 2011-11-23: error handling + link = result.first; + boost::shared_ptr wr(new WiringReplicator(link)); + broker.getExchanges().registerExchange(wr); + } +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h new file mode 100644 index 0000000000..b4183a4dba --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -0,0 +1,56 @@ +#ifndef QPID_HA_BACKUP_H +#define QPID_HA_BACKUP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Settings.h" +#include "qpid/Url.h" +#include + +namespace qpid { +namespace broker { +class Broker; +class Link; +} + +namespace ha { +class Settings; + +/** + * State associated with a backup broker. Manages connections to primary. + * + * THREAD SAFE: trivially because currently it only has a constructor. + * May need locking as the functionality grows. + */ +class Backup +{ + public: + Backup(broker::Broker&, const Settings&); + + private: + broker::Broker& broker; + Settings settings; + boost::shared_ptr link; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_BACKUP_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp new file mode 100644 index 0000000000..22b7e46595 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "HaBroker.h" +#include "Settings.h" +#include "ReplicatingSubscription.h" +#include "qpid/Exception.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/ha/Package.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace ha { + +namespace _qmf = ::qmf::org::apache::qpid::ha; +using namespace management; +using namespace std; + +namespace { +Url url(const std::string& s, const std::string& id) { + try { + return Url(s); + } catch (const std::exception& e) { + throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'"); + } +} +} // namespace + +HaBroker::HaBroker(broker::Broker& b, const Settings& s) + : broker(b), + clientUrl(url(s.clientUrl, "ha-client-url")), + brokerUrl(url(s.brokerUrl, "ha-broker-url")), + mgmtObject(0) +{ + ManagementAgent* ma = broker.getManagementAgent(); + if (ma) { + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this); + // FIXME aconway 2011-11-11: Placeholder - initialize cluster role. + mgmtObject->set_status("solo"); + ma->addObject(mgmtObject); + } + QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl + << " broker-url=" << brokerUrl); + backup.reset(new Backup(broker, s)); + // Register a factory for replicating subscriptions. + broker.getConsumerFactories().add( + boost::shared_ptr( + new ReplicatingSubscription::Factory())); +} + +HaBroker::~HaBroker() {} + +Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { + switch (methodId) { + case _qmf::HaBroker::METHOD_SETSTATUS: { + std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status; + // FIXME aconway 2011-11-11: placeholder, validate & execute status change. + mgmtObject->set_status(status); + break; + } + default: + return Manageable::STATUS_UNKNOWN_METHOD; + } + return Manageable::STATUS_OK; +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h new file mode 100644 index 0000000000..1a835c9ea0 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -0,0 +1,62 @@ +#ifndef QPID_HA_BROKER_H +#define QPID_HA_BROKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Url.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h" +#include "qpid/management/Manageable.h" + +namespace qpid { +namespace broker { +class Broker; +} +namespace ha { +class Settings; +class Backup; + +/** + * HA state and actions associated with a broker. + * + * THREAD SAFE: may be called in arbitrary broker IO or timer threads. + */ +class HaBroker : public management::Manageable +{ + public: + HaBroker(broker::Broker&, const Settings&); + ~HaBroker(); + + // Implement Manageable. + qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } + management::Manageable::status_t ManagementMethod ( + uint32_t methodId, management::Args& args, std::string& text); + + private: + broker::Broker& broker; + Url clientUrl, brokerUrl; + std::auto_ptr backup; + qmf::org::apache::qpid::ha::HaBroker* mgmtObject; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_BROKER_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp new file mode 100644 index 0000000000..80f21e4320 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "HaBroker.h" +#include "Settings.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/Broker.h" + + +namespace qpid { +namespace ha { + +using namespace std; + +struct Options : public qpid::Options { + Settings& settings; + Options(Settings& s) : qpid::Options("HA Options"), settings(s) { + addOptions() + ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features") + ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.") + ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") + ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") + ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") + ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") + ; + } +}; + +struct HaPlugin : public Plugin { + + Settings settings; + Options options; + auto_ptr haBroker; + + HaPlugin() : options(settings) {} + + Options* getOptions() { return &options; } + + void earlyInitialize(Plugin::Target& ) {} + + void initialize(Plugin::Target& target) { + broker::Broker* broker = dynamic_cast(&target); + if (broker && settings.enabled) { + haBroker.reset(new ha::HaBroker(*broker, settings)); + } else + QPID_LOG(info, "HA: Disabled"); + } +}; + +static HaPlugin instance; // Static initialization. + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp new file mode 100644 index 0000000000..7d8ee38367 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Logging.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Logging.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace ha { + +QueuePos::QueuePos(const broker::QueuedMessage& qm) + : queue(qm.queue), position(qm.position) {} + +std::ostream& operator<<(std::ostream& o, const QueuePos& qp) { + return o << qp.queue->getName() << "[" << qp.position << "]"; +} + +}} // namesopace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h new file mode 100644 index 0000000000..3b12baa390 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Logging.h @@ -0,0 +1,55 @@ +#ifndef QPID_HA_HAOSTREAM_H +#define QPID_HA_HAOSTREAM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +/**@file ostream helpers used in log messages. */ + +namespace qpid { + +namespace broker { +class Queue; +class QueuedMessage; +} + +namespace framing { +class SequenceNumber; +} + +namespace ha { + +// Other printable helpers + +struct QueuePos { + const broker::Queue* queue; + const framing::SequenceNumber& position; + QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos) + : queue(q), position(pos) {} + QueuePos(const broker::QueuedMessage& qm); +}; + +std::ostream& operator<<(std::ostream& o, const QueuePos& h); + +}} // namespace qpid::ha + +#endif /*!QPID_HA_HAOSTREAM_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp new file mode 100644 index 0000000000..2de9ec5a59 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueReplicator.h" +#include "ReplicatingSubscription.h" +#include "Logging.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include + +namespace { +const std::string QPID_REPLICATOR_("qpid.replicator-"); +} + +namespace qpid { +namespace ha { +using namespace broker; + +QueueReplicator::QueueReplicator(boost::shared_ptr q, boost::shared_ptr l) + : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? + queue(q), link(l), current(queue->getPosition()) +{ + // FIXME aconway 2011-11-24: consistent logging. + QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); + queue->getBroker()->getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + queue->getName(), // src + getName(), // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&QueueReplicator::initializeBridge, this, _1, _2) + ); +} + +QueueReplicator::~QueueReplicator() {} + +// NB: This is called back ina broker connection thread when the +// bridge is created. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + // No lock needed, no mutable member variables are used. + framing::AMQP_ServerProxy peer(sessionHandler.out); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + framing::FieldTable settings; + // FIXME aconway 2011-11-28: string constants. + settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // FIXME aconway 2011-11-28: inconsistent use of _ vs. - + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + qpid::framing::SequenceNumber oldest; + if (queue->getOldest(oldest)) + settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); + + peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest); +} + + +namespace { +const std::string DEQUEUE_EVENT("dequeue-event"); +const std::string REPLICATOR("qpid.replicator-"); +} + +void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) +{ + if (key == DEQUEUE_EVENT) { + std::string content; + msg.getMessage().getFrames().getContent(content); + qpid::framing::Buffer buffer(const_cast(content.c_str()), content.size()); + qpid::framing::SequenceSet latest; + latest.decode(buffer); + + //TODO: should be able to optimise the following + for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) { + if (current < *i) { + //haven't got that far yet, record the dequeue + dequeued.add(*i); + QPID_LOG(trace, "HA: Recording dequeue of message at " << + QueuePos(queue.get(), *i)); + } else { + QueuedMessage message; + if (queue->acquireMessageAt(*i, message)) { + queue->dequeue(0, message); + QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message)); + } else { + // FIXME aconway 2011-11-29: error handling + QPID_LOG(error, "HA: Unable to dequeue message at " + << QueuePos(queue.get(), *i)); + } + } + } + } else { + //take account of any gaps in sequence created by messages + //dequeued before our subscription reached them + while (dequeued.contains(++current)) { + dequeued.remove(current); + QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName()); + queue->setPosition(current); + } + QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current); + msg.deliverTo(queue); + } +} + +bool QueueReplicator::bind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr, const std::string&, const qpid::framing::FieldTable*) { return false; } +bool QueueReplicator::isBound(boost::shared_ptr, const std::string* const, const qpid::framing::FieldTable* const) { return false; } + +// FIXME aconway 2011-11-28: rationalise string constants. +static const std::string TYPE_NAME("qpid.queue-replicator"); + +std::string QueueReplicator::getType() const { return TYPE_NAME; } + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h new file mode 100644 index 0000000000..8085c11b82 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -0,0 +1,72 @@ +#ifndef QPID_HA_QUEUEREPLICATOR_H +#define QPID_HA_QUEUEREPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Exchange.h" +#include "qpid/framing/SequenceSet.h" + +namespace qpid { + +namespace broker { +class Bridge; +class Link; +class Queue; +class QueueRegistry; +class SessionHandler; +class Deliverable; +} + +namespace ha { + +/** + * Exchange created on a backup broker to replicate a queue on the primary. + * + * Puts replicated messages on the local queue, handles dequeue events. + * Creates a ReplicatingSubscription on the primary by passing special + * arguments to the consume command. + * + * THREAD SAFE. + */ +class QueueReplicator : public broker::Exchange +{ + public: + QueueReplicator(boost::shared_ptr q, boost::shared_ptr l); + ~QueueReplicator(); + std::string getType() const; + bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const); + + private: + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + + sys::Mutex lock; + boost::shared_ptr queue; + boost::shared_ptr link; + framing::SequenceNumber current; + framing::SequenceSet dequeued; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_QUEUEREPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp new file mode 100644 index 0000000000..aabbd43631 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -0,0 +1,235 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" +#include "Logging.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace ha { + +using namespace framing; +using namespace broker; +using namespace std; + +// FIXME aconway 2011-11-28: review all arugment names, prefixes etc. +// Do we want a common HA prefix? +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); +const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number"); +const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number"); + +const string DOLLAR("$"); +const string INTERNAL("-internal"); + +class ReplicationStateInitialiser +{ + public: + ReplicationStateInitialiser( + qpid::framing::SequenceSet& r, + const qpid::framing::SequenceNumber& s, + const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e) + { + results.add(start, end); + } + + void operator()(const QueuedMessage& message) { + if (message.position < start) { + //replica does not have a message that should still be on the queue + QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message)); + } else if (message.position >= start && message.position <= end) { + //i.e. message is within the intial range and has not been dequeued, so remove it from the results + results.remove(message.position); + } //else message has not been seen by replica yet so can be ignored here + } + + private: + qpid::framing::SequenceSet& results; + const qpid::framing::SequenceNumber start; + const qpid::framing::SequenceNumber end; +}; + +string mask(const string& in) +{ + return DOLLAR + in + INTERNAL; +} + +boost::shared_ptr +ReplicatingSubscription::Factory::create( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) { + return boost::shared_ptr( + new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); +} + +ReplicatingSubscription::ReplicatingSubscription( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments), + events(new Queue(mask(name))), + consumer(new DelegatingConsumer(*this)) +{ + QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); + // FIXME aconway 2011-11-25: string constants. + if (arguments.isSet("qpid.high_sequence_number")) { + qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number"); + qpid::framing::SequenceNumber lwm; + if (arguments.isSet("qpid.low_sequence_number")) { + lwm = arguments.getAsInt("qpid.low_sequence_number"); + } else { + lwm = hwm; + } + qpid::framing::SequenceNumber oldest; + if (queue->getOldest(oldest)) { + if (oldest >= hwm) { + range.add(lwm, --oldest); + } else if (oldest >= lwm) { + ReplicationStateInitialiser initialiser(range, lwm, hwm); + queue->eachMessage(initialiser); + } else { //i.e. have older message on master than is reported to exist on replica + QPID_LOG(warning, "HA: Replica missing message on master"); + } + } else { + //local queue (i.e. master) is empty + range.add(lwm, queue->getPosition()); + } + QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range + << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")"); + //set position of 'cursor' + position = hwm; + } +} + +bool ReplicatingSubscription::deliver(QueuedMessage& m) +{ + return ConsumerImpl::deliver(m); +} + +void ReplicatingSubscription::cancel() +{ + getQueue()->removeObserver(boost::dynamic_pointer_cast(shared_from_this())); +} + +ReplicatingSubscription::~ReplicatingSubscription() {} + +//called before we get notified of the message being available and +//under the message lock in the queue +void ReplicatingSubscription::enqueued(const QueuedMessage& m) +{ + QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m)); + //delay completion + m.payload->getIngressCompletion().startCompleter(); +} + +void ReplicatingSubscription::generateDequeueEvent() +{ + string buf(range.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + range.encode(buffer); + range.clear(); + buffer.reset(); + + //generate event message + boost::intrusive_ptr event = new Message(); + AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + content.castBody()->decode(buffer, buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + event->getFrames().append(method); + event->getFrames().append(header); + event->getFrames().append(content); + + DeliveryProperties* props = event->getFrames().getHeaders()->get(true); + props->setRoutingKey("dequeue-event"); + + events->deliver(event); +} + +//called after the message has been removed from the deque and under +//the message lock in the queue +void ReplicatingSubscription::dequeued(const QueuedMessage& m) +{ + { + sys::Mutex::ScopedLock l(lock); + range.add(m.position); + // FIXME aconway 2011-11-29: q[pos] + QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position); + } + notify(); + if (m.position > position) { + m.payload->getIngressCompletion().finishCompleter(); + QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue"); + } +} + +bool ReplicatingSubscription::doDispatch() +{ + { + sys::Mutex::ScopedLock l(lock); + if (!range.empty()) { + generateDequeueEvent(); + } + } + bool r1 = events->dispatch(consumer); + bool r2 = ConsumerImpl::doDispatch(); + return r1 || r2; +} + +ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} +ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) +{ + return delegate.deliver(m); +} +void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } +bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } +bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } +OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h new file mode 100644 index 0000000000..b83842acbb --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -0,0 +1,109 @@ +#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H +#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SemanticState.h" +#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/ConsumerFactory.h" + +namespace qpid { + +namespace broker { +class Message; +class Queue; +class QueuedMessage; +class OwnershipToken; +} + +namespace ha { + +/** + * A susbcription that represents a backup replicating a queue. + * + * Runs on the primary. Delays completion of messages till the backup + * has acknowledged, informs backup of locally dequeued messages. + * + * THREAD UNSAFE: used only in broker connection thread. + */ +class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver +{ + public: + struct Factory : public broker::ConsumerFactory { + boost::shared_ptr create( + broker::SemanticState* parent, + const std::string& name, boost::shared_ptr , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + }; + + // Argument names for consume command. + static const std::string QPID_REPLICATING_SUBSCRIPTION; + static const std::string QPID_HIGH_SEQUENCE_NUMBER; + static const std::string QPID_LOW_SEQUENCE_NUMBER; + + ReplicatingSubscription(broker::SemanticState* parent, + const std::string& name, boost::shared_ptr , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + + ~ReplicatingSubscription(); + + void cancel(); + bool deliver(broker::QueuedMessage& msg); + void enqueued(const broker::QueuedMessage&); + void dequeued(const broker::QueuedMessage&); + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + + bool isDelayedCompletion() const { return true; } + + protected: + bool doDispatch(); + private: + boost::shared_ptr events; + boost::shared_ptr consumer; + qpid::framing::SequenceSet range; + + void generateDequeueEvent(); + class DelegatingConsumer : public Consumer + { + public: + DelegatingConsumer(ReplicatingSubscription&); + ~DelegatingConsumer(); + bool deliver(broker::QueuedMessage& msg); + void notify(); + bool filter(boost::intrusive_ptr); + bool accept(boost::intrusive_ptr); + broker::OwnershipToken* getSession(); + private: + ReplicatingSubscription& delegate; + }; +}; + + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/ diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h new file mode 100644 index 0000000000..a2d2e89d82 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -0,0 +1,47 @@ +#ifndef QPID_HA_SETTINGS_H +#define QPID_HA_SETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +namespace qpid { +namespace ha { + +using std::string; + +/** + * Configurable settings for HA. + */ +class Settings +{ + public: + Settings() : enabled(false) {} + bool enabled; + string clientUrl; + string brokerUrl; + string username, password, mechanism; + private: +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_SETTINGS_H*/ diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp new file mode 100644 index 0000000000..125e2c0ba6 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -0,0 +1,463 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "WiringReplicator.h" +#include "QueueReplicator.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" + +namespace qpid { +namespace ha { + +using qmf::org::apache::qpid::broker::EventBind; +using qmf::org::apache::qpid::broker::EventExchangeDeclare; +using qmf::org::apache::qpid::broker::EventExchangeDelete; +using qmf::org::apache::qpid::broker::EventQueueDeclare; +using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventSubscribe; +using namespace framing; +using std::string; +using types::Variant; +using namespace broker; + +namespace { + +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string QPID_REPLICATE("qpid.replicate"); + +const string CLASS_NAME("_class_name"); +const string EVENT("_event"); +const string OBJECT_NAME("_object_name"); +const string PACKAGE_NAME("_package_name"); +const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); + +const string ALTEX("altEx"); +const string ARGS("args"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); +const string BIND("bind"); +const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); +const string DURABLE("durable"); +const string EXCHANGE("exchange"); +const string EXNAME("exName"); +const string EXTYPE("exType"); +const string KEY("key"); +const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); +const string TYPE("type"); +const string USER("user"); + +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); + +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); + +bool isQMFv2(const Message& message) { + const framing::MessageProperties* props = message.getProperties(); + return props && props->getAppId() == QMF2; +} + +template bool match(Variant::Map& schema) { + return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); +} + +// FIXME aconway 2011-11-24: this should be a class. +enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; +const string S_NONE="none"; +const string S_WIRING="wiring"; +const string S_ALL="all"; + +ReplicateLevel replicateLevel(const string& str) { + // FIXME aconway 2011-11-24: case insenstive comparison. + ReplicateLevel rl = RL_NONE; + if (str == S_WIRING) rl = RL_WIRING; + else if (str == S_ALL) rl = RL_ALL; + return rl; +} + +ReplicateLevel replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); + else return RL_NONE; +} + +ReplicateLevel replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) return replicateLevel(i->second.asString()); + else return RL_NONE; +} + +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} +} // namespace + +WiringReplicator::~WiringReplicator() {} + +WiringReplicator::WiringReplicator(const boost::shared_ptr& l) + : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) +{ + QPID_LOG(debug, "HA: Starting replication from " << + link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_WIRING_REPLICATOR, // src + QPID_WIRING_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&WiringReplicator::initializeBridge, this, _1, _2) + ); +} + +// This is called in the connection IO thread when the bridge is started. +void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Activated wiring replicator") +} + +void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { + Variant::List list; + try { + if (!isQMFv2(msg.getMessage()) || !headers) + throw Exception("Unexpected message, not QMF2 event or query response."); + // decode as list + string content = msg.getMessage().getFrames().getContent(); + amqp_0_10::ListCodec::decode(content, list); + + if (headers->getAsString(QMF_CONTENT) == EVENT) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + Variant::Map& map = i->asMap(); + Variant::Map& schema = map[SCHEMA_ID].asMap(); + Variant::Map& values = map[VALUES].asMap(); + QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values); + if (match(schema)) doEventQueueDeclare(values); + else if (match(schema)) doEventQueueDelete(values); + else if (match(schema)) doEventExchangeDeclare(values); + else if (match(schema)) doEventExchangeDelete(values); + else if (match(schema)) doEventBind(values); + // FIXME aconway 2011-11-21: handle unbind & all other events. + else if (match(schema)) {} // Deliberately ignored. + else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema))); + } + } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = i->asMap()[VALUES].asMap(); + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values); + if (type == QUEUE) doResponseQueue(values); + else if (type == EXCHANGE) doResponseExchange(values); + else if (type == BINDING) doResponseBind(values); + else throw Exception(QPID_MSG("HA: Unexpected response type: " << type)); + } + } else { + QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers)); + } + } catch (const std::exception& e) { + QPID_LOG(warning, "HA: Error replicating configuration: " << e.what()); + QPID_LOG(debug, "HA: Error processing configuration message: " << list); + } +} + +void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { + string name = values[QNAME].asString(); + Variant::Map argsMap = values[ARGS].asMap(); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + + QPID_LOG(debug, "HA: Creating queue from event " << name); + std::pair, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODEL].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()); + if (result.second) { + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. + QPID_LOG(debug, "HA: New queue replica " << name); + startQueueReplicator(result.first); + } else { + QPID_LOG(warning, "HA: Replicated queue " << name << " already exists"); + } + } +} + +void WiringReplicator::doEventQueueDelete(Variant::Map& values) { + string name = values[QNAME].asString(); + boost::shared_ptr queue = broker.getQueues().find(name); + if (queue && replicateLevel(queue->getSettings())) { + QPID_LOG(debug, "HA: Deleting queue from event: " << name); + broker.deleteQueue( + name, + values[USER].asString(), + values[RHOST].asString()); + } +} + +void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { + Variant::Map argsMap(values[ARGS].asMap()); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + string name = values[EXNAME].asString(); + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + QPID_LOG(debug, "HA: New exchange replica " << name); + if (!broker.createExchange( + name, + values[EXTYPE].asString(), + values[DURABLE].asBool(), + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()).second) { + // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // and re-create from event. See comment in doEventQueueDeclare. + QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists"); + } + } +} + +void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { + string name = values[EXNAME].asString(); + try { + boost::shared_ptr exchange = broker.getExchanges().get(name); + if (exchange && replicateLevel(exchange->getArgs())) { + QPID_LOG(debug, "HA: Deleting exchange:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } + } catch (const framing::NotFoundException&) {} +} + +void WiringReplicator::doEventBind(Variant::Map& values) { + try { + boost::shared_ptr exchange = broker.getExchanges().get(values[EXNAME].asString()); + boost::shared_ptr queue = broker.getQueues().find(values[QNAME].asString()); + // We only replicated a binds for a replicated queue to replicated exchange. + if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) { + framing::FieldTable args; + amqp_0_10::translate(values[ARGS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } + } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange. +} + +void WiringReplicator::doResponseQueue(Variant::Map& values) { + // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + string name(values[NAME].asString()); + std::pair, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODELETE].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/); + if (result.second) { + QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)"); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-11-22: Normal to find queue already + // exists if we're failing over. + QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)"); + } +} + +void WiringReplicator::doResponseExchange(Variant::Map& values) { + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)"); + if (!broker.createExchange( + values[NAME].asString(), + values[TYPE].asString(), + values[DURABLE].asBool(), + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/).second) { + QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)"); + } +} + +namespace { +const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); +const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); + +std::string getRefName(const std::string& prefix, const Variant& ref) { + Variant::Map map(ref.asMap()); + Variant::Map::const_iterator i = map.find(OBJECT_NAME); + if (i == map.end()) + throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); + const std::string name = i->second.asString(); + if (name.compare(0, prefix.size(), prefix) != 0) + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); + std::string ret = name.substr(prefix.size()); + return ret; +} + +const std::string EXCHANGE_REF("exchangeRef"); +const std::string QUEUE_REF("queueRef"); + +} // namespace + +void WiringReplicator::doResponseBind(Variant::Map& values) { + try { + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr exchange = broker.getExchanges().get(exName); + boost::shared_ptr queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate exchange if queue and exchange are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } + } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. +} + +void WiringReplicator::startQueueReplicator(const boost::shared_ptr& queue) { + // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed. + if (replicateLevel(queue->getSettings()) == RL_ALL) { + boost::shared_ptr qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + } +} + +bool WiringReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } +bool WiringReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } +bool WiringReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } + +string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; } + +}} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h new file mode 100644 index 0000000000..32109d8368 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h @@ -0,0 +1,81 @@ +#ifndef QPID_HA_REPLICATOR_H +#define QPID_HA_REPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/types/Variant.h" +#include + +namespace qpid { + +namespace broker { +class Broker; +class Link; +class Bridge; +class SessionHandler; +} + +namespace ha { + +/** + * Replicate wiring on a backup broker. + * + * Implemented as an exchange that subscribes to receive QMF + * configuration events from the primary. It configures local queues + * exchanges and bindings to replicate the primary. + * It also creates QueueReplicators for newly replicated queues. + * + * THREAD SAFE: Has no mutable state. + * + */ +class WiringReplicator : public broker::Exchange +{ + public: + WiringReplicator(const boost::shared_ptr&); + ~WiringReplicator(); + std::string getType() const; + + // Exchange methods + bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const); + + private: + void initializeBridge(broker::Bridge&, broker::SessionHandler&); + void doEventQueueDeclare(types::Variant::Map& values); + void doEventQueueDelete(types::Variant::Map& values); + void doEventExchangeDeclare(types::Variant::Map& values); + void doEventExchangeDelete(types::Variant::Map& values); + void doEventBind(types::Variant::Map&); + void doResponseQueue(types::Variant::Map& values); + void doResponseExchange(types::Variant::Map& values); + void doResponseBind(types::Variant::Map& values); + void startQueueReplicator(const boost::shared_ptr&); + + broker::Broker& broker; + boost::shared_ptr link; +}; +}} // namespace qpid::broker + +#endif /*!QPID_HA_REPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml new file mode 100644 index 0000000000..bb06e77a69 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 16d7fb0b78..91d5f6a402 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -440,6 +440,7 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") + ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") @@ -514,6 +515,12 @@ class BrokerTest(TestCase): actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01): + """Wait up to timeout for contents of queue to match expect_contents""" + def test(): return self.browse(session, queue, 0) == expect_contents + retry(test, timeout, delay) + self.assertEqual(expect_contents, self.browse(session, queue, 0)) + def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 199d1e7b57..424d4169e8 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -61,15 +61,25 @@ if HAVE_LIBCPG # You should do "newgrp ais" before running the tests to run these. # - -# ais_check checks pre-requisites for cluster tests and runs them if ok. -TESTS += \ - run_cluster_test \ - cluster_read_credit \ - test_watchdog \ - run_cluster_tests \ - federated_cluster_test \ - clustered_replication_test +# FIXME aconway 2011-11-14: Disable cluster tests on qpid-3603 branch +# Some cluster tests are known to fail on this branch. +# Immediate priority is to develop then new HA solution, +# Cluster will brought up to date when thats done. +# +# gsim: its due to the keeping of deleted messages on the deque until they can be popped off either end +# gsim: that is state that isn't available to new nodes of course +# gsim: i.e. if you dequeue a message from the middle of the deque +# gsim: it will not be on updatee but will be hidden on original node(s) +# gsim: and is needed for the direct indexing + + +# TESTS += \ +# run_cluster_test \ +# cluster_read_credit \ +# test_watchdog \ +# run_cluster_tests \ +# federated_cluster_test \ +# clustered_replication_test # Clean up after cluster_test and start_cluster CLEANFILES += cluster_test.acl cluster.ports diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py new file mode 100755 index 0000000000..9b52c2fca7 --- /dev/null +++ b/qpid/cpp/src/tests/ha_tests.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil +from qpid.messaging import Message, NotFound +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger + + +log = getLogger("qpid.ha-tests") + +class ShortTests(BrokerTest): + """Short HA functionality tests.""" + + def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + return Broker(self, args=["--load-module", BrokerTest.ha_lib, + "--ha-enable=yes", + "--ha-client-url", client_url, + "--ha-broker-url", broker_url, + ] + args, + **kwargs) + + # FIXME aconway 2011-11-15: work around async replication. + def wait(self, session, address): + def check(): + try: + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for %s"%(address) + + def assert_missing(self,session, address): + try: + session.receiver(address) + self.fail("Should not have been replicated: %s"%(address)) + except NotFound: pass + + def test_replication(self): + def queue(name, replicate): + return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + + def exchange(name, replicate, bindq): + return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) + def setup(p, prefix): + """Create config, send messages on the primary p""" + p.sender(queue(prefix+"q1", "all")).send(Message("1")) + p.sender(queue(prefix+"q2", "wiring")).send(Message("2")) + p.sender(queue(prefix+"q3", "none")).send(Message("3")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) + # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. + p.sender(queue(prefix+"x", "wiring")) + + def verify(b, prefix): + """Verify setup was replicated to backup b""" + # FIXME aconway 2011-11-21: wait for wiring to replicate. + self.wait(b, prefix+"x"); + # Verify backup + # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. + self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) + self.assert_browse_retry(b, prefix+"q2", []) # wiring only + self.assert_missing(b, prefix+"q3") + b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all + self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) + b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring + self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) + + # Create config, send messages before starting the backup, to test catch-up replication. + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + p = primary.connect().session() + setup(p, "1") + # Start the backup + backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + b = backup.connect().session() + verify(b, "1") + + # Create config, send messages after starting the backup, to test steady-state replication. + setup(p, "2") + verify(b, "2") + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 26be15b48a..100612f978 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule HA_LIB ha.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test index 1a0f711ace..13f31fe78b 100755 --- a/qpid/python/qpid-python-test +++ b/qpid/python/qpid-python-test @@ -570,6 +570,8 @@ total = len(filtered) + len(ignored) if opts.xml and not list_only: xmlr = JunitXmlStyleReporter(opts.xml); xmlr.begin(); +else: + xmlr = None passed = 0 failed = 0 diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py index feae4bb1bd..48e29ad912 100755 --- a/qpid/tools/setup.py +++ b/qpid/tools/setup.py @@ -31,7 +31,8 @@ setup(name="qpid-tools", "src/py/qpid-route", "src/py/qpid-stat", "src/py/qpid-tool", - "src/py/qmf-tool"], + "src/py/qmf-tool", + "src/py/qpid-ha-status"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status new file mode 100755 index 0000000000..c70e4c9af3 --- /dev/null +++ b/qpid/tools/src/py/qpid-ha-status @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys +from qpid.management import managementChannel, managementClient + +usage=""" +Usage: qpid-ha-status [broker-address] [status] +If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo. +""" + +STATUS_VALUES=["primary", "backup", "solo"] + +def is_valid_status(value): return value in STATUS_VALUES + +def validate_status(value): + if not is_valid_status(value): + raise Exception("Invalid HA status value: %s"%(value)) + +class HaBroker: + def __init__(self, broker, session): + self.session = session + try: + self.qmf_broker = self.session.addBroker(broker) + except Exception, e: + raise Exception("Can't connect to %s: %s"%(broker,e)) + ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha") + if (not ha_brokers): raise Exception("Broker does not have HA enabled.") + self.ha_broker = ha_brokers[0]; + + def get_status(self): + return self.ha_broker.status + + def set_status(self, value): + validate_status(value) + self.ha_broker.setStatus(value) + +def parse_args(args): + broker, status = "localhost:5672", None + if args and is_valid_status(args[-1]): + status = args[-1] + args.pop() + if args: broker = args[0] + return broker, status + +def main(): + try: + session = qmf.console.Session() + try: + broker, status = parse_args(sys.argv[1:]) + hb = HaBroker(broker, session) + if status: hb.set_status(status) + else: print hb.get_status() + finally: + session.close() + return 0 + except Exception, e: + print e + return -1 + +if __name__ == "__main__": + sys.exit(main()) -- cgit v1.2.1