summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-11-01 14:10:48 +0000
committerAlan Conway <aconway@apache.org>2010-11-01 14:10:48 +0000
commitec8173b568c8c57f7f21215f0e64947b69f0c13b (patch)
tree7e6f8a165570eda6434675031e7447bfe52c5273 /qpid/cpp/src/qpid/cluster
parent131419d3a24c1ad93ae794c272e26b10cac9c415 (diff)
downloadqpid-python-ec8173b568c8c57f7f21215f0e64947b69f0c13b.tar.gz
Moved new cluster design docs into cpp/design.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1029671 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/new-cluster-active-passive.txt64
-rw-r--r--qpid/cpp/src/qpid/cluster/new-cluster-design.txt395
-rw-r--r--qpid/cpp/src/qpid/cluster/new-cluster-plan.txt473
3 files changed, 0 insertions, 932 deletions
diff --git a/qpid/cpp/src/qpid/cluster/new-cluster-active-passive.txt b/qpid/cpp/src/qpid/cluster/new-cluster-active-passive.txt
deleted file mode 100644
index 315876a152..0000000000
--- a/qpid/cpp/src/qpid/cluster/new-cluster-active-passive.txt
+++ /dev/null
@@ -1,64 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* Active/passive cluster implementation
-
-The active-active cluster can be used in an active-passive mode. In
-this mode we would like the cluster to be as efficient as a strictly
-active-passive implementation.
-
-An active/passive implementation requires a subset of the active/active approach:
-- drop Queue ownership and locking
-- replicate subset of Cluster interface, don't need to know all message disposition.
-
-Can re-use:
-- cluster membership
-- new member updates
-- store integration
-
-Simpler implementation of broker::Cluster:
-- act like distributed MessageStore. Don't need acquisition details.
-- can do immediate local enqueue and still guarantee order.
-- can use smaller message IDs: just sequence number. Can be implicit.
-
-Extra requirements:
-- Exactly one broker hast to take over if primary fails.
-- Passive members refuse client connections and redirect to active member.
-- On failover, clients keep trying till they find the active member.
-
-** Active/active vs. active passive
-
-Active/active benefits:
-- Total # connections: practical 60k limit per node.
-- Handle client losing connectivity to one cluster node - can fail over to any.
-- Some load sharing: reading from client + multicast only done on direct node.
-- Clients can switch to any broker on failover
-- Failure affects a subset of the clients, not all the clients.
-
-Active/active drawbacks:
-- Co-ordinating message allocation impacts performance.
-
-Active/passive benefits:
-- Don't need message allocation strategy, can feed consumers at top speed.
-
-Active/passive drawbacks:
-- All clients on one node so a failure affects every client in the system.
-- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
-- After a failure there may be a period where no broker is active.
-- Can't help clients with no connectivity to the active member.
-- Clients must find the single active to fail-over.
diff --git a/qpid/cpp/src/qpid/cluster/new-cluster-design.txt b/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
deleted file mode 100644
index 6350ae12e0..0000000000
--- a/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
+++ /dev/null
@@ -1,395 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* A new design for Qpid clustering.
-
-** Issues with current design.
-
-The cluster is based on virtual synchrony: each broker multicasts
-events and the events from all brokers are serialized and delivered in
-the same order to each broker.
-
-In the current design raw byte buffers from client connections are
-multicast, serialized and delivered in the same order to each broker.
-
-Each broker has a replica of all queues, exchanges, bindings and also
-all connections & sessions from every broker. Cluster code treats the
-broker as a "black box", it "plays" the client data into the
-connection objects and assumes that by giving the same input, each
-broker will reach the same state.
-
-A new broker joining the cluster receives a snapshot of the current
-cluster state, and then follows the multicast conversation.
-
-*** Maintenance issues.
-
-The entire state of each broker is replicated to every member:
-connections, sessions, queues, messages, exchanges, management objects
-etc. Any discrepancy in the state that affects how messages are
-allocated to consumers can cause an inconsistency.
-
-- Entire broker state must be faithfully updated to new members.
-- Management model also has to be replicated.
-- All queues are replicated, can't have unreplicated queues (e.g. for management)
-
-Events that are not deterministically predictable from the client
-input data stream can cause inconsistencies. In particular use of
-timers/timestamps require cluster workarounds to synchronize.
-
-A member that encounters an error which is not encounted by all other
-members is considered inconsistent and will shut itself down. Such
-errors can come from any area of the broker code, e.g. different
-ACL files can cause inconsistent errors.
-
-The following areas required workarounds to work in a cluster:
-
-- Timers/timestamps in broker code: management, heartbeats, TTL
-- Security: cluster must replicate *after* decryption by security layer.
-- Management: not initially included in the replicated model, source of many inconsistencies.
-
-It is very easy for someone adding a feature or fixing a bug in the
-standalone broker to break the cluster by:
-- adding new state that needs to be replicated in cluster updates.
-- doing something in a timer or other non-connection thread.
-
-It's very hard to test for such breaks. We need a looser coupling
-and a more explicitly defined interface between cluster and standalone
-broker code.
-
-*** Performance issues.
-
-Virtual synchrony delivers all data from all clients in a single
-stream to each broker. The cluster must play this data thru the full
-broker code stack: connections, sessions etc. in a single thread
-context in order to get identical behavior on each broker. The cluster
-has a pipelined design to get some concurrency but this is a severe
-limitation on scalability in multi-core hosts compared to the
-standalone broker which processes each connection in a separate thread
-context.
-
-** A new cluster design.
-
-Clearly defined interface between broker code and cluster plug-in.
-
-Replicate queue events rather than client data.
-- Broker behavior only needs to match per-queue.
-- Smaller amount of code (queue implementation) that must behave predictably.
-- Events only need be serialized per-queue, allows concurrency between queues
-
-Use a moving queue ownership protocol to agree order of dequeues.
-No longer relies on identical state and lock-step behavior to cause
-identical dequeues on each broker.
-
-Each queue has an associated thread-context. Events for a queue are executed
-in that queues context, in parallel with events for other queues.
-
-*** Requirements
-
-The cluster must provide these delivery guarantees:
-
-- client sends transfer: message must be replicated and not lost even if the local broker crashes.
-- client acquires a message: message must not be delivered on another broker while acquired.
-- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
-- client releases message: message must be re-queued on cluster and not lost.
-- client rejects message: message must be dead-lettered or discarded and forgotten.
-- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.
-
-Each guarantee takes effect when the client receives a *completion*
-for the associated command (transfer, acquire, reject, accept)
-
-*** Broker receiving messages
-
-On recieving a message transfer, in the connection thread we:
-- multicast a message-received event.
-- enqueue and complete the transfer when it is self-delivered.
-
-Other brokers enqueue the message when they recieve the message-received event.
-
-Enqueues are queued up with other queue operations to be executed in the
-thread context associated with the queue.
-
-*** Broker sending messages: moving queue ownership
-
-Each queue is *owned* by at most one cluster broker at a time. Only
-that broker may acquire or dequeue messages. The owner multicasts
-notification of messages it acquires/dequeues to the cluster.
-Periodically the owner hands over ownership to another interested
-broker, providing time-shared access to the queue among all interested
-brokers.
-
-We assume the same IO-driven dequeuing algorithm as the standalone
-broker with one modification: queues can be "locked". A locked queue
-is not available for dequeuing messages and will be skipped by the
-output algorithm.
-
-At any given time only those queues owned by the local broker will be
-unlocked.
-
-As messages are acquired/dequeued from unlocked queues by the IO threads
-the broker multicasts acquire/dequeue events to the cluster.
-
-When an unlocked queue has no more consumers with credit, or when a
-time limit expires, the broker relinquishes ownership by multicasting
-a release-queue event, allowing another interested broker to take
-ownership.
-
-*** Asynchronous completion of accept
-### HERE
-In acknowledged mode a message is not forgotten until it is accepted,
-to allow for requeue on rejection or crash. The accept should not be
-completed till the message has been forgotten.
-
-On receiving an accept the broker:
-- dequeues the message from the local queue
-- multicasts an "accept" event
-- completes the accept asynchronously when the dequeue event is self delivered.
-
-NOTE: The message store does not currently implement asynchronous
-completions of accept, this is a bug.
-
-** Inconsistent errors.
-
-The new design eliminates most sources of inconsistent errors
-(connections, sessions, security, management etc.) The only points
-where inconsistent errors can occur are at enqueue and dequeue (most
-likely store-related errors.)
-
-The new design can use the exisiting error-handling protocol with one
-major improvement: since brokers are no longer required to maintain
-identical state they do not have to stall processing while an error is
-being resolved.
-
-#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
-
-** Updating new members
-
-When a new member (the updatee) joins a cluster it needs to be brought
-up to date with the rest of the cluster. An existing member (the
-updater) sends an "update".
-
-In the old cluster design the update is a snapshot of the entire
-broker state. To ensure consistency of the snapshot both the updatee
-and the updater "stall" at the start of the update, i.e. they stop
-processing multicast events and queue them up for processing when the
-update is complete. This creates a back-log of work to get through,
-which leaves them lagging behind the rest of the cluster till they
-catch up (which is not guaranteed to happen in a bounded time.)
-
-With the new cluster design only exchanges, queues, bindings and
-messages need to be replicated.
-
-Update of wiring (exchanges, queues, bindings) is the same as current
-design.
-
-Update of messages is different:
-- per-queue rather than per-broker, separate queues can be updated in parallel.
-- updates queues in reverse order to eliminate unbounded catch-up
-- does not require updater & updatee to stall during update.
-
-Replication events, multicast to cluster:
-- enqueue(q,m): message m pushed on back of queue q .
-- acquire(q,m): mark m acquired
-- dequeue(q,m): forget m.
-Messages sent on update connection:
-- update_front(q,m): during update, receiver pushes m to *front* of q
-- update_done(q): during update, update of q is complete.
-
-Updater:
-- when updatee joins set iterator i = q.end()
-- while i != q.begin(): --i; send update_front(q,*i) to updatee
-- send update_done(q) to updatee
-
-Updatee:
-- q initially in locked state, can't dequeue locally.
-- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
-- receive update_front(q,m): q.push_front(m)
-- receive update_done(q): q can be unlocked for local dequeing.
-
-Benefits:
-- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
-- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
-- During update consumers actually help by removing messages before they need to be updated.
-- Needs no separate "work to do" queue, only the broker queues themselves.
-
-# TODO how can we recover from updater crashing before update complete?
-# Clear queues that are not updated & send request for udpates on those queues?
-
-# TODO updatee may receive a dequeue for a message it has not yet seen, needs
-# to hold on to that so it can drop the message when it is seen.
-# Similar problem exists for wiring?
-
-** Cluster API
-
-The new cluster API is similar to the MessageStore interface.
-(Initially I thought it would be an extension of the MessageStore interface,
-but as the design develops it seems better to make it a separate interface.)
-
-The cluster interface captures these events:
-- wiring changes: queue/exchange declare/bind
-- message enqueued/acquired/released/rejected/dequeued.
-
-The cluster will require some extensions to the Queue:
-- Queues can be "locked", locked queues are ignored by IO-driven output.
-- Cluster must be able to apply queue events from the cluster to a queue.
- These appear to fit into existing queue operations.
-
-** Maintainability
-
-This design gives us more robust code with a clear and explicit interfaces.
-
-The cluster depends on specific events clearly defined by an explicit
-interface. Provided the semantics of this interface are not violated,
-the cluster will not be broken by changes to broker code.
-
-The cluster no longer requires identical processing of the entire
-broker stack on each broker. It is not affected by the details of how
-the broker allocates messages. It is independent of the
-protocol-specific state of connections and sessions and so is
-protected from future protocol changes (e.g. AMQP 1.0)
-
-A number of specific ways the code will be simplified:
-- drop code to replicate management model.
-- drop timer workarounds for TTL, management, heartbeats.
-- drop "cluster-safe assertions" in broker code.
-- drop connections, sessions, management from cluster update.
-- drop security workarounds: cluster code now operates after message decoding.
-- drop connection tracking in cluster code.
-- simper inconsistent-error handling code, no need to stall.
-
-** Performance
-
-The only way to verify the relative performance of the new design is
-to prototype & profile. The following points suggest the new design
-may scale/perform better:
-
-Some work moved from virtual synchrony thread to connection threads:
-- All connection/session logic moves to connection thread.
-- Exchange routing logic moves to connection thread.
-- On local broker dequeueing is done in connection thread
-- Local broker dequeue is IO driven as for a standalone broker.
-
-For queues with all consumers on a single node dequeue is all
-IO-driven in connection thread. Pay for time-sharing only if queue has
-consumers on multiple brokers.
-
-Doing work for different queues in parallel scales on multi-core boxes when
-there are multiple queues.
-
-One difference works against performance, thre is an extra
-encode/decode. The old design multicasts raw client data and decodes
-it in the virtual synchrony thread. The new design would decode
-messages in the connection thread, re-encode them for multicast, and
-decode (on non-local brokers) in the virtual synchrony thread. There
-is extra work here, but only in the *connection* thread: on a
-multi-core machine this happens in parallel for every connection, so
-it probably is not a bottleneck. There may be scope to optimize
-decode/re-encode by re-using some of the original encoded data, this
-could also benefit the stand-alone broker.
-
-** Asynchronous queue replication
-
-The existing "asynchronous queue replication" feature maintains a
-passive backup passive backup of queues on a remote broker over a TCP
-connection.
-
-The new cluster replication protocol could be re-used to implement
-asynchronous queue replication: its just a special case where the
-active broker is always the queue owner and the enqueue/dequeue
-messages are sent over a TCP connection rather than multicast.
-
-The new update update mechanism could also work with 'asynchronous
-queue replication', allowing such replication (over a TCP connection
-on a WAN say) to be initiated after the queue had already been created
-and been in use (one of the key missing features).
-
-** Increasing Concurrency and load sharing
-
-The current cluster is bottlenecked by processing everything in the
-CPG deliver thread. By removing the need for identical operation on
-each broker, we open up the possiblility of greater concurrency.
-
-Handling multicast enqueue, acquire, accpet, release etc: concurrency
-per queue. Operatons on different queues can be done in different
-threads.
-
-The new design does not force each broker to do all the work in the
-CPG thread so spreading load across cluster members should give some
-scale-up.
-
-** Misc outstanding issues & notes
-
-Replicating wiring
-- Need async completion of wiring commands?
-- qpid.sequence_counter: need extra work to support in new design, do we care?
-
-Cluster+persistence:
-- finish async completion: dequeue completion for store & cluster
-- cluster restart from store: clean stores *not* identical, pick 1, all others update.
-- need to generate cluster ids for messages recovered from store.
-
-Live updates: we don't need to stall brokers during an update!
-- update on queue-by-queue basis.
-- updatee locks queues during update, no dequeue.
-- update in reverse: don't update messages dequeued during update.
-- updatee adds update messages at front (as normal), replicated messages at back.
-- updater starts from back, sends "update done" when it hits front of queue.
-
-Flow control: need to throttle multicasting
-1. bound the number of outstanding multicasts.
-2. ensure the entire cluster keeps up, no unbounded "lag"
-The existing design uses read-credit to solve 1., and does not solve 2.
-New design should stop reading on all connections while flow control
-condition exists?
-
-Can federation also be unified, at least in configuration?
-
-Consider queues (and exchanges?) as having "reliability" attributes:
-- persistent: is the message stored on disk.
-- backed-up (to another broker): active/passive async replication.
-- replicated (to a cluster): active/active multicast replication to cluster.
-- federated: federation link to a queue/exchange on another broker.
-
-"Reliability" seems right for the first 3 but not for federation, is
-there a better term?
-
-Clustering and scalability: new design may give us the flexibility to
-address scalability as part of cluster design. Think about
-relationship to federation and "fragmented queues" idea.
-
-* Design questions/descisions
-** Total ordering.
-Initial thinking: allow message ordering to differ between brokers.
-New thinking: use CPG total ordering, get identical ordering on all brokers.
-- Allowing variation in order introduces too much chance of unexpected behavior.
-- Usign total order allows other optimizations, see Message Identifiers below.
-
-** Message identifiers.
-Initial thinking: message ID = CPG node id + 64 bit sequence number.
-This involves a lot of mapping between cluster IDs and broker messsages.
-
-New thinking: message ID = queue name + queue position.
-- Removes most of the mapping and memory management for cluster code.
-- Requires total ordering of messages (see above)
-
-** Message rejection
-Initial thinking: add special reject/rejected points to cluster interface so
-rejected messages could be re-queued without multicast.
-
-New thinking: treat re-queueing after reject as entirely new message.
-- Simplifies cluster interface & implementation
-- Not on the critical path.
diff --git a/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt b/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
deleted file mode 100644
index 158403e988..0000000000
--- a/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
+++ /dev/null
@@ -1,473 +0,0 @@
--*-org-*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-Notes on new cluster implementation. See also: new-cluster-design.txt
-
-* Implementation plan.
-
-Co-existence with old cluster code and tests:
-- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
-- Double up tests with old version/new version as the new code develops.
-
-Minimal POC for message delivery & perf test.
-- no wiring replication, no updates, no failover, no persistence, no async completion.
-- just implement publish and acquire/dequeue locking protocol.
-- optimize the special case where all consumers are on the same node.
-- measure performance: compare active-passive and active-active modes of use.
-
-Full implementation of transient cluster
-- Update (based on existing update), async completion etc.
-- Passing all existing transient cluster tests.
-
-Persistent cluster
-- Make sure async completion works correctly.
-- InitialStatus protoocl etc. to support persistent start-up (existing code)
-- cluster restart from store: stores not identical. Load one, update the rest.
- - assign cluster ID's to messages recovered from store, don't replicate.
-
-Improved update protocol
-- per-queue, less stalling, bounded catch-up.
-
-* Task list
-
-** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
-
-NOTE: as implementation questions arise, take the easiest option and make
-a note for later optimization/improvement.
-
-*** Tests
-- python test: 4 senders, numbered messages, 4 receivers, verify message set.
-- acquire then release messages: verify can be dequeued on any member
-- acquire then kill broker: verify can be dequeued other members.
-- acquire then reject: verify goes on alt-exchange once only.
-
-*** DONE broker::Cluster interface and call points.
-
-Initial interface commited.
-
-*** Main classes
-
-BrokerHandler:
-- implements broker::Cluster intercept points.
-- sends mcast events to inform cluster of local actions.
-- thread safe, called in connection threads.
-
-LocalMessageMap:
-- Holds local messages while they are being enqueued.
-- thread safe: called by both BrokerHandler and MessageHandler
-
-MessageHandler:
-- handles delivered mcast messages related to messages.
-- initiates local actions in response to mcast events.
-- thread unsafe, only called in deliver thread.
-- maintains view of cluster state regarding messages.
-
-QueueOwnerHandler:
-- handles delivered mcast messages related to queue consumer ownership.
-- thread safe, called in deliver, connection and timer threads.
-- maintains view of cluster state regarding queue ownership.
-
-cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
-- thread safe: manage state used by both MessageHandler and BrokerHandler
-
-The following code sketch illustrates only the "happy path" error handling
-is omitted.
-
-*** BrokerHandler
-Types:
-- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
-- struct
-
-NOTE:
-- Messages on queues are identified by a queue name + a position.
-- Messages being routed are identified by a sequence number.
-
-Members:
-- thread_local bool noReplicate // suppress replication.
-- thread_local bool isRouting // suppress operations while routing
-- Message localMessage[SequenceNumber] // local messages being routed.
-- thread_local SequenceNumber routingSequence
-
-NOTE: localMessage is also modified by MessageHandler.
-
-broker::Cluster intercept functions:
-
-routing(msg)
- if noReplicate: return
- # Supress everything except enqueues while we are routing.
- # We don't want to replicate acquires & dequeues caused by an enqueu,
- # e.g. removal of messages from ring/LV queues.
- isRouting = true
-
-enqueue(qmsg):
- if noReplicate: return
- if routingSequence == 0 # thread local
- routingSequence = nextRoutingSequence()
- mcast create(encode(qmsg.msg),routingSeq)
- mcast enqueue(qmsg.q,routingSeq)
-
-routed(msg):
- if noReplicate: return
- isRouting = false
-
-acquire(qmsg):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast acquire(qmsg)
-
-release(QueuedMessage)
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast release(qmsg)
-
-accept(QueuedMessage):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast accept(qmsg)
-
-reject(QueuedMessage):
- isRejecting = true
- mcast reject(qmsg)
-
-# FIXME no longer needed?
-drop(QueuedMessage)
- cleanup(qmsg)
-
-*** MessageHandler and mcast messages
-Types:
-- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
-- struct QueueKey { MessageId id; QueueName q; }
-- typedef map<QueueKey, QueueEntry> Queue
-- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
-
-Members:
-- QueueEntry enqueued[QueueKey]
-- Node node[NodeId]
-
-Mcast messages in Message class:
-
-create(msg,seq)
- if sender != self: node[sender].routing[seq] = decode(msg)
-
-enqueue(q,seq):
- id = (sender,seq)
- if sender == self:
- enqueued[id,q] = (localMessage[seq], acquired=None)
- else:
- msg = sender.routing[seq]
- enqueued[id,q] = (qmsg, acquired=None)
- with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
-
-routed(seq):
- if sender == self: localMessage.erase(msg.id.seq)
- else: sender.routing.erase(seq)
-
-acquire(id,q):
- enqueued[id,q].acquired = sender
- node[sender].acquired.push_back((id,q))
- if sender != self:
- with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
-
-release(id,q)
- enqueued[id,q].acquired = None
- node[sender].acquired.erase((id,q))
- if sender != self
- with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
-
-reject(id,q):
- sender.routing[id] = enqueued[id,q] # prepare for re-queueing
-
-rejected(id,q)
- sender.routing.erase[id]
-
-dequeue(id,q)
- entry = enqueued[id,q]
- enqueued.erase[id,q]
- node[entry.acquired].acquired.erase(id,q)
- if sender != self:
- with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
-
-member m leaves cluster:
- for key in node[m].acquired:
- release(key.id, key.q)
- node.erase(m)
-
-*** Queue consumer locking
-
-When a queue is locked it does not deliver messages to its consumers.
-
-New broker::Queue functions:
-- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
-- startConsumers(): reset consumersStopped flag
-
-Implementation sketch, locking omitted:
-
-void Queue::stopConsumers() {
- consumersStopped = true;
- while (consumersBusy) consumersBusyMonitor.wait();
-}
-
-void Queue::startConsumers() {
- consumersStopped = false;
- listeners.notify();
-}
-
-bool Queue::dispatch(consumer) {
- if (consumersStopped) return false;
- ++consumersBusy;
- do_regular_dispatch_body()
- if (--consumersBusy == 0) consumersBusyMonitor.notify();
-}
-
-*** QueueOwnerHandler
-
-Invariants:
-- Each queue is owned by at most one node at any time.
-- Each node is interested in a set of queues at any given time.
-- A queue is un-owned if no node is interested.
-
-The queue owner releases the queue when
-- it loses interest i.e. queue has no consumers with credit.
-- a configured time delay expires and there are other interested nodes.
-
-The owner mcasts release(q). On delivery the new queue owner is the
-next node in node-id order (treating nodes as a circular list)
-starting from the old owner that is interested in the queue.
-
-Queue consumers initially are stopped, only started when we get
-ownership from the cluster.
-
-Thread safety: called by deliver, connection and timer threads, needs locking.
-
-Thread safe object per queue holding queue ownership status.
-Called by deliver, connection and timer threads.
-
-class QueueOwnership {
- bool owned;
- Timer timer;
- BrokerQueue q;
-
- drop(): # locked
- if owned:
- owned = false
- q.stopConsumers()
- mcast release(q.name, false)
- timer.stop()
-
- take(): # locked
- if not owned:
- owned = true
- q.startConsumers()
- timer.start(timeout)
-
- timer.fire(): drop()
-}
-
-Data Members, only modified/examined in deliver thread:
-- typedef set<NodeId> ConsumerSet
-- map<QueueName, ConsumerSet> consumers
-- map<QueueName, NodeId> owner
-
-Thread safe data members, accessed in connection threads (via BrokerHandler):
-- map<QueueName, QueueOwnership> ownership
-
-Multicast messages in QueueOwner class:
-
-consume(q):
- if sender==self and consumers[q].empty(): ownership[q].take()
- consumers[q].insert(sender)
-
-release(q):
- asssert(owner[q] == sender and owner[q] in consumers[q])
- owner[q] = circular search from sender in consumers[q]
- if owner==self: ownership[q].take()
-
-cancel(q):
- assert(queue[q].owner != sender) # sender must release() before cancel()
- consumers[q].erase(sender)
-
-member-leaves:
- for q in queue: if owner[q] = left: left.release(q)
-
-Need 2 more intercept points in broker::Cluster:
-
-consume(q,consumer,consumerCount) - Queue::consume()
- if consumerCount == 1: mcast consume(q)
-
-cancel(q,consumer,consumerCount) - Queue::cancel()
- if consumerCount == 0:
- ownership[q].drop()
- mcast cancel(q)
-
-#TODO: lifecycle, updating cluster data structures when queues are destroyed
-
-*** Increasing concurrency
-The major performance limitation of the old cluster is that it does
-everything in the single CPG deliver thread context.
-
-We can get additional concurrency by creating a thread context _per queue_
-for queue operations: enqueue, acquire, accept etc.
-
-We associate a PollableQueue of queue operations with each AMQP queue.
-The CPG deliver thread would
-- build messages and associate with cluster IDs.
-- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
-
-Serializing operations on the same queue avoids contention, but takes advantage
-of the independence of operations on separate queues.
-
-*** Re-use of existing cluster code
-- re-use Event
-- re-use Multicaster
-- re-use same PollableQueueSetup (may experiment later)
-- new Core class to replace Cluster.
-- keep design modular, keep threading rules clear.
-
-** TODO [#B] Large message replication.
-Multicast should encode messages in fixed size buffers (64k)?
-Can't assume we can send message in one chunk.
-For 0-10 can use channel numbers & send whole frames packed into larger buffer.
-** TODO [#B] Batch CPG multicast messages
-The new cluster design involves a lot of small multicast messages,
-they need to be batched into larger CPG messages for efficiency.
-** TODO [#B] Genuine async completion
-Replace current synchronous waiting implementation with genuine async completion.
-
-Test: enhance test_store.cpp to defer enqueueComplete till special message received.
-
-Async callback uses *requestIOProcessing* to queue action on IO thread.
-
-** TODO [#B] Async completion of accept when dequeue completes.
-Interface is already there on broker::Message, just need to ensure
-that store and cluster implementations call it appropriately.
-
-** TODO [#B] Replicate wiring.
-From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
-
-** TODO [#B] New members joining - first pass
-
-Re-use update code from old cluster but don't replicate sessions &
-connections.
-
-Need to extend it to send cluster IDs with messages.
-
-Need to replicate the queue ownership data as part of the update.
-
-** TODO [#B] Persistence support.
-InitialStatus protoocl etc. to support persistent start-up (existing code)
-
-Only one broker recovers from store, update to others.
-
-Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
-
-** TODO [#B] Handle other ways that messages can leave a queue.
-
-Other ways (other than via a consumer) that messages are take off a queue.
-
-NOTE: Not controlled by queue lock, how to make them consistent?
-
-Target broker may not have all messages on other brokers for purge/destroy.
-- Queue::move() - need to wait for lock? Replicate?
-- Queue::get() - ???
-- Queue::purge() - replicate purge? or just delete what's on broker ?
-- Queue::destroy() - messages to alternate exchange on all brokers.?
-
-Need to add callpoints & mcast messages to replicate these?
-
-** TODO [#B] Flow control for internal queues.
-
-Need to bound the size of internal queues: delivery and multicast.
-- stop polling for read on client connections when we reach a bound.
-- restart polling when we get back under it.
-
-That will stop local multicasting, we still have to deal with remote
-multicasting (note existing cluster does not do this.) Something like:
-- when over bounds multicast a flow-control event.
-- on delivery of flow-control all members stop polling to read client connections
-- when back under bounds send flow-control-end, all members resume
-- if flow-controling member dies others resume
-
-** TODO [#B] Integration with transactions.
-Do we want to replicate during transaction & replicate commit/rollback
-or replicate only on commit?
-No integration with DTX transactions.
-** TODO [#B] Make new cluster work with replication exchange.
-Possibly re-use some common logic. Replication exchange is like clustering
-except over TCP.
-** TODO [#B] Better concurrency, scalabiility on multi-cores.
-Introduce PollableQueue of operations per broker queue. Queue up mcast
-operations (enqueue, acquire, accept etc.) to be handled concurrently
-on different queue. Performance testing to verify improved scalability.
-** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
-Cluster needs to complete these asynchronously to guarantee resources
-exist across the cluster when the command completes.
-
-** TODO [#C] Allow non-replicated exchanges, queues.
-
-Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
-- save replicated status to store.
-- support in management tools.
-Replicated exchange: replicate binds to replicated queues.
-Replicated queue: replicate all messages.
-
-** TODO [#C] New members joining - improved.
-
-Replicate wiring like old cluster, stall for wiring but not for
-messages. Update messages on a per-queue basis from back to front.
-
-Updater:
-- stall & push wiring: declare exchanges, queues, bindings.
-- start update iterator thread on each queue.
-- unstall and process normally while iterator threads run.
-
-Update iterator thread:
-- starts at back of updater queue, message m.
-- send update_front(q,m) to updatee and advance towards front
-- at front: send update_done(q)
-
-Updatee:
-- stall, receive wiring, lock all queues, mark queues "updating", unstall
-- update_front(q,m): push m to *front* of q
-- update_done(q): mark queue "ready"
-
-Updatee cannot take the queue consume lock for a queue that is updating.
-Updatee *can* push messages onto a queue that is updating.
-
-TODO: Is there any way to eliminate the stall for wiring?
-
-** TODO [#C] Refactoring of common concerns.
-
-There are a bunch of things that act as "Queue observers" with intercept
-points in similar places.
-- QueuePolicy
-- QueuedEvents (async replication)
-- MessageStore
-- Cluster
-
-Look for ways to capitalize on the similarity & simplify the code.
-
-In particular QueuedEvents (async replication) strongly resembles
-cluster replication, but over TCP rather than multicast.
-** TODO [#C] Concurrency for enqueue events.
-All enqueue events are being processed in the CPG deliver thread context which
-serializes all the work. We only need ordering on a per queue basis, can we
-enqueue in parallel on different queues and will that improve performance?
-** TODO [#C] Handling immediate messages in a cluster
-Include remote consumers in descision to deliver an immediate message?