summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-11-01 14:10:29 +0000
committerAlan Conway <aconway@apache.org>2010-11-01 14:10:29 +0000
commit131419d3a24c1ad93ae794c272e26b10cac9c415 (patch)
treeef88339e7484c7f2d924b6f1c1dd9c70785246c8
parente4c64780f0ba4281d96cdfb018684d50942f039a (diff)
downloadqpid-python-131419d3a24c1ad93ae794c272e26b10cac9c415.tar.gz
Updates to new cluster design docs.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1029670 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/new-cluster-design.txt144
-rw-r--r--qpid/cpp/src/qpid/cluster/new-cluster-plan.txt77
2 files changed, 113 insertions, 108 deletions
diff --git a/qpid/cpp/src/qpid/cluster/new-cluster-design.txt b/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
index 8d9f72ac02..6350ae12e0 100644
--- a/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
+++ b/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
@@ -76,22 +76,27 @@ broker code.
Virtual synchrony delivers all data from all clients in a single
stream to each broker. The cluster must play this data thru the full
broker code stack: connections, sessions etc. in a single thread
-context. The cluster has a pipelined design to get some concurrency
-but this is a severe limitation on scalability in multi-core hosts
-compared to the standalone broker which processes each connection
-in a separate thread context.
+context in order to get identical behavior on each broker. The cluster
+has a pipelined design to get some concurrency but this is a severe
+limitation on scalability in multi-core hosts compared to the
+standalone broker which processes each connection in a separate thread
+context.
** A new cluster design.
-
-Maintain /equivalent/ state not /identical/ state on each member.
-Messages from different sources need not be ordered identically on all members.
+Clearly defined interface between broker code and cluster plug-in.
+
+Replicate queue events rather than client data.
+- Broker behavior only needs to match per-queue.
+- Smaller amount of code (queue implementation) that must behave predictably.
+- Events only need be serialized per-queue, allows concurrency between queues
-Use a moving queue ownership protocol to agree order of dequeues, rather
-than relying on identical state and lock-step behavior to cause
+Use a moving queue ownership protocol to agree order of dequeues.
+No longer relies on identical state and lock-step behavior to cause
identical dequeues on each broker.
-Clearly defined interface between broker code and cluster plug-in.
+Each queue has an associated thread-context. Events for a queue are executed
+in that queues context, in parallel with events for other queues.
*** Requirements
@@ -104,35 +109,19 @@ The cluster must provide these delivery guarantees:
- client rejects message: message must be dead-lettered or discarded and forgotten.
- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.
-
Each guarantee takes effect when the client receives a *completion*
for the associated command (transfer, acquire, reject, accept)
-The cluster must provide this ordering guarantee:
-
-- messages from the same publisher received by the same subscriber
- must be received in the order they were sent (except in the case of
- re-queued messages.)
-
*** Broker receiving messages
On recieving a message transfer, in the connection thread we:
- multicast a message-received event.
-- enqueue the message on the local queue.
-- asynchronously complete the transfer when the message-received is self-delivered.
-
-This like asynchronous completion in the MessageStore: the cluster
-"stores" a message by multicast. We send a completion to the client
-asynchronously when the multicast "completes" by self-delivery. This
-satisfies the "client sends transfer" guarantee, but makes the message
-available on the queue immediately, avoiding the multicast latency.
+- enqueue and complete the transfer when it is self-delivered.
-It also moves most of the work to the client connection thread. The
-only work in the virtual synchrony deliver thread is sending the client
-completion.
+Other brokers enqueue the message when they recieve the message-received event.
-Other brokers enqueue the message when they recieve the
-message-received event, in the virtual synchrony deliver thread.
+Enqueues are queued up with other queue operations to be executed in the
+thread context associated with the queue.
*** Broker sending messages: moving queue ownership
@@ -143,10 +132,6 @@ Periodically the owner hands over ownership to another interested
broker, providing time-shared access to the queue among all interested
brokers.
-This means we no longer need identical message ordering on all brokers
-to get consistent dequeuing. Messages from different sources can be
-ordered differently on different brokers.
-
We assume the same IO-driven dequeuing algorithm as the standalone
broker with one modification: queues can be "locked". A locked queue
is not available for dequeuing messages and will be skipped by the
@@ -164,14 +149,14 @@ a release-queue event, allowing another interested broker to take
ownership.
*** Asynchronous completion of accept
-
+### HERE
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.
On receiving an accept the broker:
- dequeues the message from the local queue
-- multicasts a "dequeue" event
+- multicasts an "accept" event
- completes the accept asynchronously when the dequeue event is self delivered.
NOTE: The message store does not currently implement asynchronous
@@ -191,16 +176,6 @@ being resolved.
#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
-When a new member (the updatee) joins a cluster it needs to be brought up to date.
-The old cluster design an existing member (the updater) sends a state snapshot.
-
-To ensure consistency of the snapshot both the updatee and the updater
-"stall" at the point of the update. They stop processing multicast
-events and queue them up for processing when the update is
-complete. This creates a back-log of work for each to get through,
-which leaves them lagging behind the rest of the cluster till they
-catch up (which is not guaranteed to happen in a bounded time.)
-
** Updating new members
When a new member (the updatee) joins a cluster it needs to be brought
@@ -270,8 +245,8 @@ The cluster interface captures these events:
The cluster will require some extensions to the Queue:
- Queues can be "locked", locked queues are ignored by IO-driven output.
-- Messages carry a cluster-message-id.
-- messages can be dequeued by cluster-message-id
+- Cluster must be able to apply queue events from the cluster to a queue.
+ These appear to fit into existing queue operations.
** Maintainability
@@ -302,27 +277,27 @@ The only way to verify the relative performance of the new design is
to prototype & profile. The following points suggest the new design
may scale/perform better:
-Moving work from virtual synchrony thread to connection threads where
-it can be done in parallel:
+Some work moved from virtual synchrony thread to connection threads:
- All connection/session logic moves to connection thread.
- Exchange routing logic moves to connection thread.
-- Local broker does all enqueue/dequeue work in connection thread
-- Enqueue/dequeue are IO driven as for a standalone broker.
-
-Optimizes common cases (pay for what you use):
-- Publisher/subscriber on same broker: replication is fully asynchronous, no extra latency.
-- Unshared queue: dequeue is all IO-driven in connection thread.
-- Time sharing: pay for time-sharing only if queue has consumers on multiple brokers.
-
-#TODO Not clear how the time sharing algorithm would compare with the existing cluster delivery algorithm.
-
-Extra decode/encode: The old design multicasts raw client data and
-decodes it in the virtual synchrony thread. The new design would
-decode messages in the connection thread, re-encode them for
-multicast, and decode (on non-local brokers) in the virtual synchrony
-thread. There is extra work here, but only in the *connection* thread:
-on a multi-core machine this happens in parallel for every connection,
-so it probably is not a bottleneck. There may be scope to optimize
+- On local broker dequeueing is done in connection thread
+- Local broker dequeue is IO driven as for a standalone broker.
+
+For queues with all consumers on a single node dequeue is all
+IO-driven in connection thread. Pay for time-sharing only if queue has
+consumers on multiple brokers.
+
+Doing work for different queues in parallel scales on multi-core boxes when
+there are multiple queues.
+
+One difference works against performance, thre is an extra
+encode/decode. The old design multicasts raw client data and decodes
+it in the virtual synchrony thread. The new design would decode
+messages in the connection thread, re-encode them for multicast, and
+decode (on non-local brokers) in the virtual synchrony thread. There
+is extra work here, but only in the *connection* thread: on a
+multi-core machine this happens in parallel for every connection, so
+it probably is not a bottleneck. There may be scope to optimize
decode/re-encode by re-using some of the original encoded data, this
could also benefit the stand-alone broker.
@@ -342,20 +317,15 @@ queue replication', allowing such replication (over a TCP connection
on a WAN say) to be initiated after the queue had already been created
and been in use (one of the key missing features).
-** Optimizing the active-passive special case.
-
-In the case where all consumers of a queue are on the same broker, we
-can get better performance because we don't need to transfer ownership
-or information about acquisition. We need to optimize this case to
-perform like an active-passive mode of replication.
-
** Increasing Concurrency and load sharing
+
The current cluster is bottlenecked by processing everything in the
CPG deliver thread. By removing the need for identical operation on
each broker, we open up the possiblility of greater concurrency.
-Handling multicast enqueue, acquire, accpet, release etc: concurrency per queue.
-Operatons on different queues can be done in different threads.
+Handling multicast enqueue, acquire, accpet, release etc: concurrency
+per queue. Operatons on different queues can be done in different
+threads.
The new design does not force each broker to do all the work in the
CPG thread so spreading load across cluster members should give some
@@ -401,3 +371,25 @@ Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.
+* Design questions/descisions
+** Total ordering.
+Initial thinking: allow message ordering to differ between brokers.
+New thinking: use CPG total ordering, get identical ordering on all brokers.
+- Allowing variation in order introduces too much chance of unexpected behavior.
+- Usign total order allows other optimizations, see Message Identifiers below.
+
+** Message identifiers.
+Initial thinking: message ID = CPG node id + 64 bit sequence number.
+This involves a lot of mapping between cluster IDs and broker messsages.
+
+New thinking: message ID = queue name + queue position.
+- Removes most of the mapping and memory management for cluster code.
+- Requires total ordering of messages (see above)
+
+** Message rejection
+Initial thinking: add special reject/rejected points to cluster interface so
+rejected messages could be re-queued without multicast.
+
+New thinking: treat re-queueing after reject as entirely new message.
+- Simplifies cluster interface & implementation
+- Not on the critical path.
diff --git a/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt b/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
index 35f35288cc..158403e988 100644
--- a/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
+++ b/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
@@ -92,18 +92,18 @@ is omitted.
*** BrokerHandler
Types:
-- struct QueuedMessage { Message msg; QueueName q; Position pos; }
-- SequenceNumber 64 bit sequence number to identify messages.
-- NodeId 64 bit CPG node-id, identifies member of the cluster.
-- struct MessageId { NodeId node; SequenceNumber seq; }
+- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
+- struct
-NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue.
+NOTE:
+- Messages on queues are identified by a queue name + a position.
+- Messages being routed are identified by a sequence number.
Members:
-- atomic<SequenceNumber> sequence // sequence number for message IDs.
- thread_local bool noReplicate // suppress replication.
- thread_local bool isRouting // suppress operations while routing
-- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+- Message localMessage[SequenceNumber] // local messages being routed.
+- thread_local SequenceNumber routingSequence
NOTE: localMessage is also modified by MessageHandler.
@@ -118,47 +118,37 @@ routing(msg)
enqueue(qmsg):
if noReplicate: return
- if !qmsg.msg.id:
- seq = sequence++
- qmsg.msg.id = (self,seq)
- localMessage[seq] = qmsg
- mcast create(encode(qmsg.msg),seq)
- mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+ if routingSequence == 0 # thread local
+ routingSequence = nextRoutingSequence()
+ mcast create(encode(qmsg.msg),routingSeq)
+ mcast enqueue(qmsg.q,routingSeq)
routed(msg):
if noReplicate: return
- if msg.id: mcast routed(msg.id.seq)
isRouting = false
acquire(qmsg):
if noReplicate: return
if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast acquire(msg.id, q)
+ if msg.id: mcast acquire(qmsg)
release(QueuedMessage)
if noReplicate: return
if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast release(id, q)
+ mcast release(qmsg)
accept(QueuedMessage):
if noReplicate: return
if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast dequeue(msg.id, msg.q)
+ mcast accept(qmsg)
reject(QueuedMessage):
isRejecting = true
- if msg.id: mcast reject(msg.id, msg.q)
+ mcast reject(qmsg)
-rejected(QueuedMessage):
- isRejecting = false
- mcast dequeue(msg.id, msg.q)
-
-dequeue(QueuedMessage)
- # No mcast in dequeue, only used for local cleanup of resources.
- # E.g. messages that are replaced on an LVQ are dequeued without being
- # accepted or rejected. dequeue is called with the queue lock held
- # FIXME revisit - move it out of the queue lock.
- cleanup(msg)
+# FIXME no longer needed?
+drop(QueuedMessage)
+ cleanup(qmsg)
*** MessageHandler and mcast messages
Types:
@@ -328,6 +318,21 @@ cancel(q,consumer,consumerCount) - Queue::cancel()
#TODO: lifecycle, updating cluster data structures when queues are destroyed
+*** Increasing concurrency
+The major performance limitation of the old cluster is that it does
+everything in the single CPG deliver thread context.
+
+We can get additional concurrency by creating a thread context _per queue_
+for queue operations: enqueue, acquire, accept etc.
+
+We associate a PollableQueue of queue operations with each AMQP queue.
+The CPG deliver thread would
+- build messages and associate with cluster IDs.
+- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
+
+Serializing operations on the same queue avoids contention, but takes advantage
+of the independence of operations on separate queues.
+
*** Re-use of existing cluster code
- re-use Event
- re-use Multicaster
@@ -387,10 +392,18 @@ Target broker may not have all messages on other brokers for purge/destroy.
Need to add callpoints & mcast messages to replicate these?
** TODO [#B] Flow control for internal queues.
-
-Need to bound the size of the internal queues holding cluster events & frames.
-- stop polling when we reach bound.
-- start polling when we get back under it.
+
+Need to bound the size of internal queues: delivery and multicast.
+- stop polling for read on client connections when we reach a bound.
+- restart polling when we get back under it.
+
+That will stop local multicasting, we still have to deal with remote
+multicasting (note existing cluster does not do this.) Something like:
+- when over bounds multicast a flow-control event.
+- on delivery of flow-control all members stop polling to read client connections
+- when back under bounds send flow-control-end, all members resume
+- if flow-controling member dies others resume
+
** TODO [#B] Integration with transactions.
Do we want to replicate during transaction & replicate commit/rollback
or replicate only on commit?