summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/design_docs/new-cluster-design.txt302
-rw-r--r--qpid/cpp/design_docs/new-cluster-plan.txt563
-rw-r--r--qpid/cpp/docs/man/qpidd.14
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp5
-rw-r--r--qpid/cpp/src/qmf.mk9
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp14
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h7
-rw-r--r--qpid/cpp/src/qpid/RefCountedBuffer.cpp9
-rw-r--r--qpid/cpp/src/qpid/Sasl.h4
-rw-r--r--qpid/cpp/src/qpid/SaslFactory.cpp11
-rw-r--r--qpid/cpp/src/qpid/acl/AclPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp53
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h6
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp59
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp33
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp24
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/windows/SaslFactory.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/framing/Uuid.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h2
-rw-r--r--qpid/cpp/src/qpid/sys/SslPlugin.cpp145
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp33
-rw-r--r--qpid/cpp/src/qpid/sys/alloca.h6
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslIo.cpp20
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslIo.h16
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp159
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/SslSocket.h44
-rw-r--r--qpid/cpp/src/qpid/sys/unordered_map.h35
-rw-r--r--qpid/cpp/src/qpid/types/Uuid.cpp19
-rw-r--r--qpid/cpp/src/qpidd.cpp5
-rw-r--r--qpid/cpp/src/qpidd.h3
-rw-r--r--qpid/cpp/src/tests/BrokerOptions.cpp79
-rw-r--r--qpid/cpp/src/tests/Makefile.am12
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp50
-rwxr-xr-xqpid/cpp/src/tests/acl.py64
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py95
-rw-r--r--qpid/cpp/src/tests/qpid-perftest.cpp14
-rw-r--r--qpid/cpp/src/tests/sasl.mk5
-rwxr-xr-xqpid/cpp/src/tests/sasl_no_dir218
-rwxr-xr-xqpid/cpp/src/tests/ssl_test34
-rw-r--r--qpid/cpp/src/windows/QpiddBroker.cpp186
-rw-r--r--qpid/cpp/src/windows/SCM.cpp332
-rw-r--r--qpid/cpp/src/windows/SCM.h109
57 files changed, 1962 insertions, 965 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-design.txt b/qpid/cpp/design_docs/new-cluster-design.txt
index 7adb46fee3..936530a39a 100644
--- a/qpid/cpp/design_docs/new-cluster-design.txt
+++ b/qpid/cpp/design_docs/new-cluster-design.txt
@@ -17,7 +17,6 @@
# under the License.
* A new design for Qpid clustering.
-
** Issues with current design.
The cluster is based on virtual synchrony: each broker multicasts
@@ -84,19 +83,21 @@ context.
** A new cluster design.
-Clearly defined interface between broker code and cluster plug-in.
+1. Clearly defined interface between broker code and cluster plug-in.
-Replicate queue events rather than client data.
-- Broker behavior only needs to match per-queue.
-- Smaller amount of code (queue implementation) that must behave predictably.
-- Events only need be serialized per-queue, allows concurrency between queues
+2. Replicate queue events rather than client data.
+ - Only requires consistent enqueue order.
+ - Events only need be serialized per-queue, allows concurrency between queues
+ - Allows for replicated and non-replicated queues.
-Use a moving queue ownership protocol to agree order of dequeues.
-No longer relies on identical state and lock-step behavior to cause
-identical dequeues on each broker.
+3. Use a lock protocol to agree order of dequeues: only the broker
+ holding the lock can acqiure & dequeue. No longer relies on
+ identical state and lock-step behavior to cause identical dequeues
+ on each broker.
-Each queue has an associated thread-context. Events for a queue are executed
-in that queues context, in parallel with events for other queues.
+4. Use multiple CPG groups to process different queues in
+ parallel. Use a fixed set of groups and hash queue names to choose
+ the group for each queue.
*** Requirements
@@ -149,7 +150,7 @@ a release-queue event, allowing another interested broker to take
ownership.
*** Asynchronous completion of accept
-### HERE
+
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.
@@ -162,19 +163,32 @@ On receiving an accept the broker:
NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.
+*** Multiple CPG groups.
+
+The old cluster was bottlenecked by processing everything in a single
+CPG deliver thread.
+
+The new cluster uses a set of CPG groups, one per core. Queue names
+are hashed to give group indexes, so statistically queues are likely
+to be spread over the set of groups.
+
+Operations on a given queue always use the same group, so we have
+order within each queue, but operations on different queues can use
+different groups giving greater throughput sending to CPG and multiple
+handler threads to process CPG messages.
+
** Inconsistent errors.
-The new design eliminates most sources of inconsistent errors
-(connections, sessions, security, management etc.) The only points
-where inconsistent errors can occur are at enqueue and dequeue (most
-likely store-related errors.)
+An inconsistent error means that after multicasting an enqueue, accept
+or dequeue, some brokers succeed in processing it and others fail.
-The new design can use the exisiting error-handling protocol with one
-major improvement: since brokers are no longer required to maintain
-identical state they do not have to stall processing while an error is
-being resolved.
+The new design eliminates most sources of inconsistent errors in the
+old broker: connections, sessions, security, management etc. Only
+store journal errors remain.
-#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
+The new inconsistent error protocol is similar to the old one with one
+major improvement: brokers do not have to stall processing while an
+error is being resolved.
** Updating new members
@@ -193,60 +207,44 @@ catch up (which is not guaranteed to happen in a bounded time.)
With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.
-Update of wiring (exchanges, queues, bindings) is the same as current
-design.
-
-Update of messages is different:
-- per-queue rather than per-broker, separate queues can be updated in parallel.
-- updates queues in reverse order to eliminate unbounded catch-up
-- does not require updater & updatee to stall during update.
+We update individual objects (queues and exchanges) independently.
+- create queues first, then update all queues and exchanges in parallel.
+- multiple updater threads, per queue/exchange.
-Replication events, multicast to cluster:
-- enqueue(q,m): message m pushed on back of queue q .
-- acquire(q,m): mark m acquired
-- dequeue(q,m): forget m.
-Messages sent on update connection:
-- update_front(q,m): during update, receiver pushes m to *front* of q
-- update_done(q): during update, update of q is complete.
+Queue updater:
+- marks the queue position at the sync point
+- sends messages starting from the sync point working towards the head of the queue.
+- send "done" message.
-Updater:
-- when updatee joins set iterator i = q.end()
-- while i != q.begin(): --i; send update_front(q,*i) to updatee
-- send update_done(q) to updatee
+Queue updatee:
+- enqueues received from CPG: add to back of queue as normal.
+- dequeues received from CPG: apply if found, else save to check at end of update.
+- messages from updater: add to the *front* of the queue.
+- update complete: apply any saved dequeues.
-Updatee:
-- q initially in locked state, can't dequeue locally.
-- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
-- receive update_front(q,m): q.push_front(m)
-- receive update_done(q): q can be unlocked for local dequeing.
+Exchange updater:
+- updater: send snapshot of exchange as it was at the sync point.
-Benefits:
-- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
-- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
-- During update consumers actually help by removing messages before they need to be updated.
-- Needs no separate "work to do" queue, only the broker queues themselves.
+Exchange updatee:
+- queue exchange operations after the sync point.
+- when snapshot is received: apply saved operations.
-# TODO how can we recover from updater crashing before update complete?
-# Clear queues that are not updated & send request for udpates on those queues?
+Note:
+- Updater is active throughout, no stalling.
+- Consuming clients actually reduce the size of the update.
+- Updatee stalls clients until the update completes.
+ (Note: May be possible to avoid updatee stall as well, needs thought)
-# TODO updatee may receive a dequeue for a message it has not yet seen, needs
-# to hold on to that so it can drop the message when it is seen.
-# Similar problem exists for wiring?
+** Internal cluster interface
-** Cluster API
-
-The new cluster API is similar to the MessageStore interface.
-(Initially I thought it would be an extension of the MessageStore interface,
-but as the design develops it seems better to make it a separate interface.)
+The new cluster interface is similar to the MessageStore interface, but
+provides more detail (message positions) and some additional call
+points (e.g. acquire)
The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.
-
-The cluster will require some extensions to the Queue:
-- Queues can be "locked", locked queues are ignored by IO-driven output.
-- Cluster must be able to apply queue events from the cluster to a queue.
- These appear to fit into existing queue operations.
+- transactional events.
** Maintainability
@@ -273,106 +271,48 @@ A number of specific ways the code will be simplified:
** Performance
-The only way to verify the relative performance of the new design is
-to prototype & profile. The following points suggest the new design
-may scale/perform better:
-
-Some work moved from virtual synchrony thread to connection threads:
-- All connection/session logic moves to connection thread.
-- Exchange routing logic moves to connection thread.
-- On local broker dequeueing is done in connection thread
-- Local broker dequeue is IO driven as for a standalone broker.
-
-For queues with all consumers on a single node dequeue is all
-IO-driven in connection thread. Pay for time-sharing only if queue has
-consumers on multiple brokers.
-
-Doing work for different queues in parallel scales on multi-core boxes when
-there are multiple queues.
-
-One difference works against performance, thre is an extra
-encode/decode. The old design multicasts raw client data and decodes
-it in the virtual synchrony thread. The new design would decode
-messages in the connection thread, re-encode them for multicast, and
-decode (on non-local brokers) in the virtual synchrony thread. There
-is extra work here, but only in the *connection* thread: on a
-multi-core machine this happens in parallel for every connection, so
-it probably is not a bottleneck. There may be scope to optimize
-decode/re-encode by re-using some of the original encoded data, this
-could also benefit the stand-alone broker.
-
-** Asynchronous queue replication
-
-The existing "asynchronous queue replication" feature maintains a
-passive backup passive backup of queues on a remote broker over a TCP
-connection.
-
-The new cluster replication protocol could be re-used to implement
-asynchronous queue replication: its just a special case where the
-active broker is always the queue owner and the enqueue/dequeue
-messages are sent over a TCP connection rather than multicast.
-
-The new update update mechanism could also work with 'asynchronous
-queue replication', allowing such replication (over a TCP connection
-on a WAN say) to be initiated after the queue had already been created
-and been in use (one of the key missing features).
-
-** Increasing Concurrency and load sharing
-
-The current cluster is bottlenecked by processing everything in the
-CPG deliver thread. By removing the need for identical operation on
-each broker, we open up the possiblility of greater concurrency.
-
-Handling multicast enqueue, acquire, accpet, release etc: concurrency
-per queue. Operatons on different queues can be done in different
-threads.
-
-The new design does not force each broker to do all the work in the
-CPG thread so spreading load across cluster members should give some
-scale-up.
-
-** Misc outstanding issues & notes
-
-Replicating wiring
-- Need async completion of wiring commands?
-- qpid.sequence_counter: need extra work to support in new design, do we care?
-
-Cluster+persistence:
-- finish async completion: dequeue completion for store & cluster
-- cluster restart from store: clean stores *not* identical, pick 1, all others update.
-- need to generate cluster ids for messages recovered from store.
-
-Live updates: we don't need to stall brokers during an update!
-- update on queue-by-queue basis.
-- updatee locks queues during update, no dequeue.
-- update in reverse: don't update messages dequeued during update.
-- updatee adds update messages at front (as normal), replicated messages at back.
-- updater starts from back, sends "update done" when it hits front of queue.
-
-Flow control: need to throttle multicasting
-1. bound the number of outstanding multicasts.
-2. ensure the entire cluster keeps up, no unbounded "lag"
-The existing design uses read-credit to solve 1., and does not solve 2.
-New design should stop reading on all connections while flow control
-condition exists?
-
-Can federation also be unified, at least in configuration?
-
-Consider queues (and exchanges?) as having "reliability" attributes:
-- persistent: is the message stored on disk.
-- backed-up (to another broker): active/passive async replication.
-- replicated (to a cluster): active/active multicast replication to cluster.
-- federated: federation link to a queue/exchange on another broker.
-
-"Reliability" seems right for the first 3 but not for federation, is
-there a better term?
-
-Clustering and scalability: new design may give us the flexibility to
-address scalability as part of cluster design. Think about
-relationship to federation and "fragmented queues" idea.
-
-* Design debates/descisions
+The standalone broker processes _connections_ concurrently, so CPU
+usage increases as you add more connections.
+
+The new cluster processes _queues_ concurrently, so CPU usage increases as you
+add more queues.
+
+In both cases, CPU usage peaks when the number of "units of
+ concurrency" (connections or queues) goes above the number of cores.
+
+When all consumers on a queue are connected to the same broker the new
+cluster uses the same messagea allocation threading/logic as a
+standalone broker, with a little extra asynchronous book-keeping.
+
+If a queue has multiple consumers connected to multiple brokers, the
+new cluster time-shares the queue which is less efficient than having
+all consumers on a queue connected to the same broker.
+** Flow control
+New design does not queue up CPG delivered messages, they are
+processed immediately in the CPG deliver thread. This means that CPG's
+flow control is sufficient for qpid.
+
+** Live upgrades
+
+Live upgrades refers to the ability to upgrade a cluster while it is
+running, with no downtime. Each brokers in the cluster is shut down,
+and then re-started with a new version of the broker code.
+
+To achieve this
+- Cluster protocl XML file has a new element <version number=N> attached
+ to each method. This is the version at which the method was added.
+- New versions can only add methods, existing methods cannot be changed.
+- The cluster handshake for new members includes the protocol version
+ at each member.
+- The cluster's version is the lowest version among its members.
+- A newer broker can join and older cluster. When it does, it must restrict
+ itself to speaking the older version protocol.
+- When the cluster version increases (because the lowest version member has left)
+ the remaining members may move up to the new version.
+
+
+* Design debates
** Active/active vs. active passive
An active-active cluster can be used in an active-passive mode. In
@@ -385,7 +325,7 @@ An active/passive implementation allows some simplifications over active/active:
- can do immediate local enqueue and still guarantee order.
Active/passive introduces a few extra requirements:
-- Exactly one broker hast to take over if primary fails.
+- Exactly one broker has to take over if primary fails.
- Passive members must refuse client connections.
- On failover, clients must re-try all known addresses till they find the active member.
@@ -393,43 +333,17 @@ Active/active benefits:
- A broker failure only affects the subset of clients connected to that broker.
- Clients can switch to any other broker on failover
- Backup brokers are immediately available on failover.
-- Some load sharing: reading from client + multicast only done on direct node.
-
-Active/active drawbacks:
-- Co-ordinating message acquisition may impact performance (not tested)
-- Code may be more complex that active/passive.
+- As long as a client can connect to any broker in the cluster, it can be served.
Active/passive benefits:
-- Don't need message allocation strategy, can feed consumers at top speed.
-- Code may be simpler than active/active.
+- Don't need to replicate message allocation, can feed consumers at top speed.
Active/passive drawbacks:
- All clients on one node so a failure affects every client in the system.
- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary.
- Clients must find the single active node, may involve multiple connect attempts.
+- No service if a partition separates a client from the active broker,
+ even if the client can see other brokers.
-** Total ordering.
-
-Initial thinking: allow message ordering to differ between brokers.
-New thinking: use CPG total ordering, get identical ordering on all brokers.
-- Allowing variation in order introduces too much chance of unexpected behavior.
-- Usign total order allows other optimizations, see Message Identifiers below.
-
-** Message identifiers.
-
-Initial thinking: message ID = CPG node id + 64 bit sequence number.
-This involves a lot of mapping between cluster IDs and broker messsages.
-
-New thinking: message ID = queue name + queue position.
-- Removes most of the mapping and memory management for cluster code.
-- Requires total ordering of messages (see above)
-
-** Message rejection
-
-Initial thinking: add special reject/rejected points to cluster interface so
-rejected messages could be re-queued without multicast.
-New thinking: treat re-queueing after reject as entirely new message.
-- Simplifies cluster interface & implementation
-- Not on the critical path.
diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt
index 781876e55a..626e443be7 100644
--- a/qpid/cpp/design_docs/new-cluster-plan.txt
+++ b/qpid/cpp/design_docs/new-cluster-plan.txt
@@ -17,376 +17,156 @@
# specific language governing permissions and limitations
# under the License.
+* Status of impementation
-Notes on new cluster implementation. See also: new-cluster-design.txt
+Meaning of priorities:
+[#A] Essential for basic functioning.
+[#B] Required for first release.
+[#C] Can be addressed in a later release.
-* Implementation plan.
+The existig prototype is bare bones to do performance benchmarks:
+- Implements publish and consumer locking protocol.
+- Defered delivery and asynchronous completion of message.
+- Optimize the case all consumers are on the same node.
+- No new member updates, no failover updates, no transactions, no persistence etc.
-Co-existence with old cluster code and tests:
-- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
-- Double up tests with old version/new version as the new code develops.
+Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/
-Minimal POC for message delivery & perf test.
-- no wiring replication, no updates, no failover, no persistence, no async completion.
-- just implement publish and acquire/dequeue locking protocol.
-- optimize the special case where all consumers are on the same node.
-- measure performance: compare active-passive and active-active modes of use.
+** Similarities to existing cluster.
-Full implementation of transient cluster
-- Update (based on existing update), async completion etc.
-- Passing all existing transient cluster tests.
+/Active-active/: the new cluster can be a drop-in replacement for the
+old, existing tests & customer deployment configurations are still
+valid.
-Persistent cluster
-- Make sure async completion works correctly.
-- InitialStatus protoocl etc. to support persistent start-up (existing code)
-- cluster restart from store: stores not identical. Load one, update the rest.
- - assign cluster ID's to messages recovered from store, don't replicate.
+/Virtual synchrony/: Uses corosync to co-ordinate activity of members.
-Improved update protocol
-- per-queue, less stalling, bounded catch-up.
+/XML controls/: Uses XML to define the primitives multicast to the
+cluster.
-* Task list
+** Differences with existing cluster.
-** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+/Report rather than predict consumption/: brokers explicitly tell each
+other which messages have been acquired or dequeued. This removes the
+major cause of bugs in the existing cluster.
-NOTE: as implementation questions arise, take the easiest option and make
-a note for later optimization/improvement.
+/Queue consumer locking/: to avoid duplicates only one broker can acquire or
+dequeue messages at a time - while has the consume-lock on the
+queue. If multiple brokers are consuming from the same queue the lock
+is passed around to time-share access to the queue.
-*** Tests
-- python test: 4 senders, numbered messages, 4 receivers, verify message set.
-- acquire then release messages: verify can be dequeued on any member
-- acquire then kill broker: verify can be dequeued other members.
-- acquire then reject: verify goes on alt-exchange once only.
+/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting
+the concurrency of the host) to allow concurrent processing on
+different queues. Queues are hashed onto the groups.
-*** DONE broker::Cluster interface and call points.
+* Completed tasks
+** DONE [#A] Minimal POC: publish/acquire/dequeue protocol.
+ CLOSED: [2011-10-05 Wed 16:03]
-Initial interface commited.
+Defines broker::Cluster interface and call points.
+Initial interface commite
-*** Main classes
+Main classes
+Core: central object holding cluster classes together (replaces cluster::Cluster)
+BrokerContext: implements broker::Cluster interface.
+QueueContext: Attached to a broker::Queue, holds cluster status.
+MessageHolder:holds local messages while they are being enqueued.
-BrokerHandler:
-- implements broker::Cluster intercept points.
-- sends mcast events to inform cluster of local actions.
-- thread safe, called in connection threads.
+Implements multiple CPG groups for better concurrency.
-LocalMessageMap:
-- Holds local messages while they are being enqueued.
-- thread safe: called by both BrokerHandler and MessageHandler
-
-MessageHandler:
-- handles delivered mcast messages related to messages.
-- initiates local actions in response to mcast events.
-- thread unsafe, only called in deliver thread.
-- maintains view of cluster state regarding messages.
+** DONE [#A] Large message replication.
+ CLOSED: [2011-10-05 Wed 17:22]
+Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame)
-QueueOwnerHandler:
-- handles delivered mcast messages related to queue consumer ownership.
-- thread safe, called in deliver, connection and timer threads.
-- maintains view of cluster state regarding queue ownership.
-
-cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
-- thread safe: manage state used by both MessageHandler and BrokerHandler
-
-The following code sketch illustrates only the "happy path" error handling
-is omitted.
-
-*** BrokerHandler
-Types:
-- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
-- struct
-
-NOTE:
-- Messages on queues are identified by a queue name + a position.
-- Messages being routed are identified by a sequence number.
-
-Members:
-- thread_local bool noReplicate // suppress replication.
-- thread_local bool isRouting // suppress operations while routing
-- Message localMessage[SequenceNumber] // local messages being routed.
-- thread_local SequenceNumber routingSequence
-
-NOTE: localMessage is also modified by MessageHandler.
-
-broker::Cluster intercept functions:
-
-routing(msg)
- if noReplicate: return
- # Supress everything except enqueues while we are routing.
- # We don't want to replicate acquires & dequeues caused by an enqueu,
- # e.g. removal of messages from ring/LV queues.
- isRouting = true
-
-enqueue(qmsg):
- if noReplicate: return
- if routingSequence == 0 # thread local
- routingSequence = nextRoutingSequence()
- mcast create(encode(qmsg.msg),routingSeq)
- mcast enqueue(qmsg.q,routingSeq)
-
-routed(msg):
- if noReplicate: return
- isRouting = false
-
-acquire(qmsg):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast acquire(qmsg)
-
-release(QueuedMessage)
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast release(qmsg)
-
-accept(QueuedMessage):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast accept(qmsg)
-
-reject(QueuedMessage):
- isRejecting = true
- mcast reject(qmsg)
-
-# FIXME no longer needed?
-drop(QueuedMessage)
- cleanup(qmsg)
-
-*** MessageHandler and mcast messages
-Types:
-- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
-- struct QueueKey { MessageId id; QueueName q; }
-- typedef map<QueueKey, QueueEntry> Queue
-- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
-
-Members:
-- QueueEntry enqueued[QueueKey]
-- Node node[NodeId]
-
-Mcast messages in Message class:
-
-create(msg,seq)
- if sender != self: node[sender].routing[seq] = decode(msg)
-
-enqueue(q,seq):
- id = (sender,seq)
- if sender == self:
- enqueued[id,q] = (localMessage[seq], acquired=None)
- else:
- msg = sender.routing[seq]
- enqueued[id,q] = (qmsg, acquired=None)
- with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
-
-routed(seq):
- if sender == self: localMessage.erase(msg.id.seq)
- else: sender.routing.erase(seq)
-
-acquire(id,q):
- enqueued[id,q].acquired = sender
- node[sender].acquired.push_back((id,q))
- if sender != self:
- with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
-
-release(id,q)
- enqueued[id,q].acquired = None
- node[sender].acquired.erase((id,q))
- if sender != self
- with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
-
-reject(id,q):
- sender.routing[id] = enqueued[id,q] # prepare for re-queueing
-
-rejected(id,q)
- sender.routing.erase[id]
-
-dequeue(id,q)
- entry = enqueued[id,q]
- enqueued.erase[id,q]
- node[entry.acquired].acquired.erase(id,q)
- if sender != self:
- with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
-
-member m leaves cluster:
- for key in node[m].acquired:
- release(key.id, key.q)
- node.erase(m)
-
-*** Queue consumer locking
-
-When a queue is locked it does not deliver messages to its consumers.
-
-New broker::Queue functions:
-- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
-- startConsumers(): reset consumersStopped flag
-
-Implementation sketch, locking omitted:
-
-void Queue::stopConsumers() {
- consumersStopped = true;
- while (consumersBusy) consumersBusyMonitor.wait();
-}
-
-void Queue::startConsumers() {
- consumersStopped = false;
- listeners.notify();
-}
-
-bool Queue::dispatch(consumer) {
- if (consumersStopped) return false;
- ++consumersBusy;
- do_regular_dispatch_body()
- if (--consumersBusy == 0) consumersBusyMonitor.notify();
-}
-
-*** QueueOwnerHandler
-
-Invariants:
-- Each queue is owned by at most one node at any time.
-- Each node is interested in a set of queues at any given time.
-- A queue is un-owned if no node is interested.
-
-The queue owner releases the queue when
-- it loses interest i.e. queue has no consumers with credit.
-- a configured time delay expires and there are other interested nodes.
-
-The owner mcasts release(q). On delivery the new queue owner is the
-next node in node-id order (treating nodes as a circular list)
-starting from the old owner that is interested in the queue.
-
-Queue consumers initially are stopped, only started when we get
-ownership from the cluster.
-
-Thread safety: called by deliver, connection and timer threads, needs locking.
-
-Thread safe object per queue holding queue ownership status.
-Called by deliver, connection and timer threads.
-
-class QueueOwnership {
- bool owned;
- Timer timer;
- BrokerQueue q;
-
- drop(): # locked
- if owned:
- owned = false
- q.stopConsumers()
- mcast release(q.name, false)
- timer.stop()
-
- take(): # locked
- if not owned:
- owned = true
- q.startConsumers()
- timer.start(timeout)
-
- timer.fire(): drop()
-}
-
-Data Members, only modified/examined in deliver thread:
-- typedef set<NodeId> ConsumerSet
-- map<QueueName, ConsumerSet> consumers
-- map<QueueName, NodeId> owner
+* Open questions
-Thread safe data members, accessed in connection threads (via BrokerHandler):
-- map<QueueName, QueueOwnership> ownership
+** TODO [#A] Queue sequence numbers vs. independant message IDs.
+ SCHEDULED: <2011-10-07 Fri>
-Multicast messages in QueueOwner class:
+Current prototype uses queue sequence numbers to identify
+message. This is tricky for updating new members as the sequence
+numbers are only known on delivery.
-consume(q):
- if sender==self and consumers[q].empty(): ownership[q].take()
- consumers[q].insert(sender)
+Independent message IDs that can be generated and sent with the message simplify
+this and potentially allow performance benefits by relaxing total ordering.
+However they imply additional map lookups that might hurt performance.
-release(q):
- asssert(owner[q] == sender and owner[q] in consumers[q])
- owner[q] = circular search from sender in consumers[q]
- if owner==self: ownership[q].take()
+- [X] Prototype independent message IDs, check performance.
+Throughput worse by 30% in contented case, 10% in uncontended.
+Sticking with queue sequence numbers.
-cancel(q):
- assert(queue[q].owner != sender) # sender must release() before cancel()
- consumers[q].erase(sender)
+* Outstanding Tasks
+** TODO [#A] Defer and async completion of wiring commands.
-member-leaves:
- for q in queue: if owner[q] = left: left.release(q)
+Testing requirement: Many tests assume wiring changes are visible
+across the cluster once the commad completes.
-Need 2 more intercept points in broker::Cluster:
+Name clashes: need to avoid race if same name queue/exchange declared
+on 2 brokers simultaneously
-consume(q,consumer,consumerCount) - Queue::consume()
- if consumerCount == 1: mcast consume(q)
+** TODO [#A] Passing all existing cluster tests.
-cancel(q,consumer,consumerCount) - Queue::cancel()
- if consumerCount == 0:
- ownership[q].drop()
- mcast cancel(q)
+The new cluster should be a drop-in replacement for the old, so it
+should be able to pass all the existing tests.
-#TODO: lifecycle, updating cluster data structures when queues are destroyed
-
-*** Increasing concurrency
-The major performance limitation of the old cluster is that it does
-everything in the single CPG deliver thread context.
-
-We can get additional concurrency by creating a thread context _per queue_
-for queue operations: enqueue, acquire, accept etc.
-
-We associate a PollableQueue of queue operations with each AMQP queue.
-The CPG deliver thread would
-- build messages and associate with cluster IDs.
-- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
-
-Serializing operations on the same queue avoids contention, but takes advantage
-of the independence of operations on separate queues.
+** TODO [#A] Update to new members joining.
-*** Re-use of existing cluster code
-- re-use Event
-- re-use Multicaster
-- re-use same PollableQueueSetup (may experiment later)
-- new Core class to replace Cluster.
-- keep design modular, keep threading rules clear.
+Need to resolve [[Queue sequence numbers vs. independant message IDs]] first.
+- implicit sequence numbers are more tricky to replicate to new member.
-** TODO [#B] Large message replication.
-Multicast should encode messages in fixed size buffers (64k)?
-Can't assume we can send message in one chunk.
-For 0-10 can use channel numbers & send whole frames packed into larger buffer.
-** TODO [#B] Transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Sequence number to generate per-node tx IDs.
-Replicate transaction completion.
-** TODO [#B] Batch CPG multicast messages
-The new cluster design involves a lot of small multicast messages,
-they need to be batched into larger CPG messages for efficiency.
-** TODO [#B] Genuine async completion
-Replace current synchronous waiting implementation with genuine async completion.
+Update individual objects (queues and exchanges) independently.
+- create queues first, then update all queues and exchanges in parallel.
+- multiple updater threads, per queue/exchange.
+- updater sends messages to special exchange(s) (not using extended AMQP controls)
+
+Queue updater:
+- marks the queue position at the sync point
+- sends messages starting from the sync point working towards the head of the queue.
+- send "done" message.
+Note: updater remains active throughout, consuming clients actually reduce the
+size of the update.
-Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+Queue updatee:
+- enqueues received from CPG: add to back of queue as normal.
+- dequeues received from CPG: apply if found, else save to check at end of update.
+- messages from updater: add to the *front* of the queue.
+- update complete: apply any saved dequeues.
-Async callback uses *requestIOProcessing* to queue action on IO thread.
+Exchange updater:
+- updater: send snapshot of exchange as it was at the sync point.
-** TODO [#B] Async completion of accept when dequeue completes.
-Interface is already there on broker::Message, just need to ensure
-that store and cluster implementations call it appropriately.
+Exchange updatee:
+- queue exchange operations after the sync point.
+- when snapshot is received: apply saved operations.
-** TODO [#B] Replicate wiring.
-From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+Updater remains active throughout.
+Updatee stalls clients until the update completes.
-** TODO [#B] New members joining - first pass
+Updating queue/exchange/binding objects is via the same encode/decode
+that is used by the store. Updatee to use recovery interfaces to
+recover?
-Re-use update code from old cluster but don't replicate sessions &
-connections.
+** TODO [#A] Failover updates to client.
+Implement the amq.failover exchange to notify clients of membership.
-Need to extend it to send cluster IDs with messages.
+** TODO [#B] Initial status protocol.
+Handshake to give status of each broker member to new members joining.
+Status includes
+- persistent store state (clean, dirty)
+- cluster protocol version.
-Need to replicate the queue ownership data as part of the update.
+** TODO [#B] Replace boost::hash with our own hash function.
+The hash function is effectively part of the interface so
+we need to be sure it doesn't change underneath us.
-** TODO [#B] Persistence support.
-InitialStatus protoocl etc. to support persistent start-up (existing code)
+** TODO [#B] Persistent cluster support.
+Initial status protoocl to support persistent start-up (see existing code)
Only one broker recovers from store, update to others.
Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
-** TODO [#B] Handle other ways that messages can leave a queue.
-
-Other ways (other than via a consumer) that messages are take off a queue.
-
-NOTE: Not controlled by queue lock, how to make them consistent?
-
+** TODO [#B] Management support
+Replicate management methods that modify queues - e.g. move, purge.
Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
@@ -395,66 +175,48 @@ Target broker may not have all messages on other brokers for purge/destroy.
Need to add callpoints & mcast messages to replicate these?
-** TODO [#B] Flow control for internal queues.
-
-Need to bound the size of internal queues: delivery and multicast.
-- stop polling for read on client connections when we reach a bound.
-- restart polling when we get back under it.
-
-That will stop local multicasting, we still have to deal with remote
-multicasting (note existing cluster does not do this.) Something like:
-- when over bounds multicast a flow-control event.
-- on delivery of flow-control all members stop polling to read client connections
-- when back under bounds send flow-control-end, all members resume
-- if flow-controling member dies others resume
-
-** TODO [#B] Integration with transactions.
-Do we want to replicate during transaction & replicate commit/rollback
-or replicate only on commit?
-No integration with DTX transactions.
-** TODO [#B] Make new cluster work with replication exchange.
-Possibly re-use some common logic. Replication exchange is like clustering
-except over TCP.
-** TODO [#B] Better concurrency, scalabiility on multi-cores.
-Introduce PollableQueue of operations per broker queue. Queue up mcast
-operations (enqueue, acquire, accept etc.) to be handled concurrently
-on different queue. Performance testing to verify improved scalability.
-** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
-Cluster needs to complete these asynchronously to guarantee resources
-exist across the cluster when the command completes.
+** TODO [#B] TX transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Running brokers exchange TX information.
+New broker update includes TX information.
-** TODO [#C] Allow non-replicated exchanges, queues.
+ // FIXME aconway 2010-10-18: As things stand the cluster is not
+ // compatible with transactions
+ // - enqueues occur after routing is complete
+ // - no call to Cluster::enqueue, should be in Queue::process?
+ // - no transaction context associated with messages in the Cluster interface.
+ // - no call to Cluster::accept in Queue::dequeueCommitted
-Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
-- save replicated status to store.
-- support in management tools.
-Replicated exchange: replicate binds to replicated queues.
-Replicated queue: replicate all messages.
+** TODO [#B] DTX transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Running brokers exchange DTX information.
+New broker update includes DTX information.
-** TODO [#C] New members joining - improved.
+** TODO [#B] Async completion of accept.
+When this is fixed in the standalone broker, it should be fixed for cluster.
-Replicate wiring like old cluster, stall for wiring but not for
-messages. Update messages on a per-queue basis from back to front.
+** TODO [#B] Network partitions and quorum.
+Re-use existing implementation.
-Updater:
-- stall & push wiring: declare exchanges, queues, bindings.
-- start update iterator thread on each queue.
-- unstall and process normally while iterator threads run.
+** TODO [#B] Review error handling, put in a consitent model.
+- [ ] Review all asserts, for possible throw.
+- [ ] Decide on fatal vs. non-fatal errors.
-Update iterator thread:
-- starts at back of updater queue, message m.
-- send update_front(q,m) to updatee and advance towards front
-- at front: send update_done(q)
+** TODO [#B] Implement inconsistent error handling policy.
+What to do if a message is enqueued sucessfully on the local broker,
+but fails on one or more backups - e.g. due to store limits?
+- we have more flexibility, we don't *have* to crash
+- but we've loste some of our redundancy guarantee, how should we inform client?
-Updatee:
-- stall, receive wiring, lock all queues, mark queues "updating", unstall
-- update_front(q,m): push m to *front* of q
-- update_done(q): mark queue "ready"
+** TODO [#C] Allow non-replicated exchanges, queues.
-Updatee cannot take the queue consume lock for a queue that is updating.
-Updatee *can* push messages onto a queue that is updating.
+Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated queue: replicate all messages.
+Replicated exchange: replicate bindings to replicated queues only.
-TODO: Is there any way to eliminate the stall for wiring?
+Configurable default? Defaults to true.
** TODO [#C] Refactoring of common concerns.
@@ -469,9 +231,46 @@ Look for ways to capitalize on the similarity & simplify the code.
In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
-** TODO [#C] Concurrency for enqueue events.
-All enqueue events are being processed in the CPG deliver thread context which
-serializes all the work. We only need ordering on a per queue basis, can we
-enqueue in parallel on different queues and will that improve performance?
+
** TODO [#C] Handling immediate messages in a cluster
Include remote consumers in descision to deliver an immediate message?
+** TODO [#C] Remove old cluster hacks and workarounds
+The old cluster has workarounds in the broker code that can be removed.
+- [ ] drop code to replicate management model.
+- [ ] drop timer workarounds for TTL, management, heartbeats.
+- [ ] drop "cluster-safe assertions" in broker code.
+- [ ] drop connections, sessions, management from cluster update.
+- [ ] drop security workarounds: cluster code now operates after message decoding.
+- [ ] drop connection tracking in cluster code.
+- [ ] simpler inconsistent-error handling code, no need to stall.
+
+** TODO [#C] Support for live upgrades.
+
+Allow brokers in a running cluster to be replaced one-by-one with a new version.
+(see new-cluster-design for design notes.)
+
+The old cluster protocol was unstable because any changes in broker
+state caused changes to the cluster protocol.The new design should be
+much more stable.
+
+Points to implement in anticipation of live upgrade:
+- Prefix each CPG message with a version number and length.
+ Version number determines how to decode the message.
+- Brokers ignore messages that have a higher version number than they understand.
+- Protocol version XML element in cluster.xml, on each control.
+- Initial status protocol to include protocol version number.
+
+New member udpates: use the store encode/decode for updates, use the
+same backward compatibility strategy as the store. This allows for
+adding new elements to the end of structures but not changing or
+removing new elements.
+
+** TODO [#C] Support for AMQP 1.0.
+
+* Testing
+** TODO [#A] Pass all existing cluster tests.
+Requires [[Defer and async completion of wiring commands.]]
+** TODO [#A] New cluster tests.
+Stress tests & performance benchmarks focused on changes in new cluster:
+- concurrency by queues rather than connections.
+- different handling shared queues when consuemrs are on different brokers.
diff --git a/qpid/cpp/docs/man/qpidd.1 b/qpid/cpp/docs/man/qpidd.1
index 1e2d3aabee..8ba6f9ebf7 100644
--- a/qpid/cpp/docs/man/qpidd.1
+++ b/qpid/cpp/docs/man/qpidd.1
@@ -132,6 +132,10 @@ at which an event will be raised
Group identifier to assign to messages
delivered to a message group queue that
do not contain an identifier.
+.TP
+\fB\-\-enable\-timestamp\fR yes|no (0)
+Add current time to each received
+message.
.SS "Logging options:"
.TP
\fB\-t\fR [ \fB\-\-trace\fR ]
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index da1f539108..bb46f1258b 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -665,6 +665,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
set (qpidd_platform_SOURCES
windows/QpiddBroker.cpp
+ windows/SCM.cpp
)
set (qpidmessaging_platform_SOURCES
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 6230a8f6f6..e87cd3b4e0 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -54,6 +54,8 @@ windows_dist = \
qpid/sys/windows/uuid.cpp \
qpid/sys/windows/uuid.h \
windows/QpiddBroker.cpp \
+ windows/SCM.h \
+ windows/SCM.cpp \
qpid/broker/windows/BrokerDefaults.cpp \
qpid/broker/windows/SaslAuthenticator.cpp \
qpid/broker/windows/SslProtocolFactory.cpp \
@@ -497,6 +499,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/Waitable.h \
qpid/sys/alloca.h \
qpid/sys/uuid.h \
+ qpid/sys/unordered_map.h \
qpid/amqp_0_10/Codecs.cpp
if HAVE_SASL
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 1270b57252..1cebcfc3ac 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -196,3 +196,8 @@ int QpiddBroker::execute (QpiddOptions *options) {
}
return 0;
}
+
+int main(int argc, char* argv[])
+{
+ return run_broker(argc, argv);
+}
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 4da8470f2f..3b6583bfaf 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -93,6 +93,7 @@ libqmf2_la_SOURCES = \
qmf/AgentEventImpl.h \
qmf/AgentImpl.h \
qmf/AgentSession.cpp \
+ qmf/AgentSessionImpl.h \
qmf/AgentSubscription.cpp \
qmf/AgentSubscription.h \
qmf/ConsoleEvent.cpp \
@@ -106,19 +107,21 @@ libqmf2_la_SOURCES = \
qmf/Data.cpp \
qmf/DataImpl.h \
qmf/EventNotifierImpl.cpp \
- qmf/PosixEventNotifier.cpp \
- qmf/PosixEventNotifierImpl.cpp \
+ qmf/EventNotifierImpl.h \
qmf/exceptions.cpp \
qmf/Expression.cpp \
qmf/Expression.h \
qmf/Hash.cpp \
qmf/Hash.h \
+ qmf/PosixEventNotifier.cpp \
+ qmf/PosixEventNotifierImpl.cpp \
+ qmf/PosixEventNotifierImpl.h \
qmf/PrivateImplRef.h \
qmf/Query.cpp \
qmf/QueryImpl.h \
- qmf/Schema.cpp \
qmf/SchemaCache.cpp \
qmf/SchemaCache.h \
+ qmf/Schema.cpp \
qmf/SchemaId.cpp \
qmf/SchemaIdImpl.h \
qmf/SchemaImpl.h \
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index f75663e131..9d363d3012 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -35,17 +35,17 @@ using qpid::framing::Uuid;
SchemaHash::SchemaHash()
{
for (int idx = 0; idx < 16; idx++)
- hash[idx] = 0x5A;
+ hash.b[idx] = 0x5A;
}
void SchemaHash::encode(Buffer& buffer) const
{
- buffer.putBin128(hash);
+ buffer.putBin128(hash.b);
}
void SchemaHash::decode(Buffer& buffer)
{
- buffer.getBin128(hash);
+ buffer.getBin128(hash.b);
}
void SchemaHash::update(uint8_t data)
@@ -55,12 +55,8 @@ void SchemaHash::update(uint8_t data)
void SchemaHash::update(const char* data, uint32_t len)
{
- union h {
- uint8_t b[16];
- uint64_t q[2];
- }* h = reinterpret_cast<union h*>(&hash[0]);
- uint64_t* first = &h->q[0];
- uint64_t* second = &h->q[1];
+ uint64_t* first = &hash.q[0];
+ uint64_t* second = &hash.q[1];
for (uint32_t idx = 0; idx < len; idx++) {
*first = *first ^ (uint64_t) data[idx];
*second = *second << 1;
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index 8b079a5ec6..683fb6f8f0 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -35,7 +35,10 @@ namespace engine {
// they've been registered.
class SchemaHash {
- uint8_t hash[16];
+ union h {
+ uint8_t b[16];
+ uint64_t q[2];
+ } hash;
public:
SchemaHash();
void encode(qpid::framing::Buffer& buffer) const;
@@ -47,7 +50,7 @@ namespace engine {
void update(Direction d) { update((uint8_t) d); }
void update(Access a) { update((uint8_t) a); }
void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
- const uint8_t* get() const { return hash; }
+ const uint8_t* get() const { return hash.b; }
bool operator==(const SchemaHash& other) const;
bool operator<(const SchemaHash& other) const;
bool operator>(const SchemaHash& other) const;
diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.cpp b/qpid/cpp/src/qpid/RefCountedBuffer.cpp
index 40d620f7ad..a82e1a02ab 100644
--- a/qpid/cpp/src/qpid/RefCountedBuffer.cpp
+++ b/qpid/cpp/src/qpid/RefCountedBuffer.cpp
@@ -20,19 +20,22 @@
*/
#include "qpid/RefCountedBuffer.h"
+#include <stdlib.h>
#include <new>
namespace qpid {
void RefCountedBuffer::released() const {
this->~RefCountedBuffer();
- ::delete[] reinterpret_cast<const char*>(this);
+ ::free (reinterpret_cast<void *>(const_cast<RefCountedBuffer *>(this)));
}
BufferRef RefCountedBuffer::create(size_t n) {
- char* store=::new char[n+sizeof(RefCountedBuffer)];
+ void* store=::malloc (n + sizeof(RefCountedBuffer));
+ if (NULL == store)
+ throw std::bad_alloc();
new(store) RefCountedBuffer;
- char* start = store+sizeof(RefCountedBuffer);
+ char* start = reinterpret_cast<char *>(store) + sizeof(RefCountedBuffer);
return BufferRef(
boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)),
start, start+n);
diff --git a/qpid/cpp/src/qpid/Sasl.h b/qpid/cpp/src/qpid/Sasl.h
index 9a9d61b037..4d579fa051 100644
--- a/qpid/cpp/src/qpid/Sasl.h
+++ b/qpid/cpp/src/qpid/Sasl.h
@@ -47,8 +47,8 @@ class Sasl
* client supports.
* @param externalSecuritySettings security related details from the underlying transport
*/
- virtual std::string start(const std::string& mechanisms,
- const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
+ virtual bool start(const std::string& mechanisms, std::string& response,
+ const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
virtual std::string step(const std::string& challenge) = 0;
virtual std::string getMechanism() = 0;
virtual std::string getUserId() = 0;
diff --git a/qpid/cpp/src/qpid/SaslFactory.cpp b/qpid/cpp/src/qpid/SaslFactory.cpp
index f117404028..a8d1f94c1e 100644
--- a/qpid/cpp/src/qpid/SaslFactory.cpp
+++ b/qpid/cpp/src/qpid/SaslFactory.cpp
@@ -112,7 +112,7 @@ class CyrusSasl : public Sasl
public:
CyrusSasl(const std::string & username, const std::string & password, const std::string & serviceName, const std::string & hostName, int minSsf, int maxSsf, bool allowInteraction);
~CyrusSasl();
- std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
+ bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -210,7 +210,7 @@ namespace {
const std::string SSL("ssl");
}
-std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings)
+bool CyrusSasl::start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings)
{
QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
int result = sasl_client_new(settings.service.c_str(),
@@ -283,7 +283,12 @@ std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettin
mechanism = std::string(chosenMechanism);
QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected "
<< mechanism << " response: '" << std::string(out, outlen) << "'");
- return std::string(out, outlen);
+ if (out) {
+ response = std::string(out, outlen);
+ return true;
+ } else {
+ return false;
+ }
}
std::string CyrusSasl::step(const std::string& challenge)
diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp
index e4d721ea44..d611797c49 100644
--- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp
+++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp
@@ -69,7 +69,7 @@ struct AclPlugin : public Plugin {
}
acl = new Acl(values, b);
- b.setAcl(acl.get());
+ b.setAcl(acl.get());
b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index bd94582d10..ec3cf9d340 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -43,6 +43,8 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) :
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
- defaultMsgGroup("qpid.no-group")
+ defaultMsgGroup("qpid.no-group"),
+ timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) :
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
- ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
}
const std::string empty;
@@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) :
else
QPID_LOG(info, "Management not enabled");
+ // this feature affects performance, so let's be sure that gets logged!
+ if (conf.timestampRcvMsgs) {
+ QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+ }
+
/**
* SASL setup, can fail and terminate startup
*/
@@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
_qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
- status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+ status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+ status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type");
const std::string QUEUE_NAME("queue");
const std::string EXCHANGE_NAME("exchange");
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
const std::string _TRUE("true");
const std::string _FALSE("false");
}
@@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_OK;;
}
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+ }
+ receive = config.timestampRcvMsgs;
+ return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+ }
+ config.timestampRcvMsgs = receive;
+ QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+ return Manageable::STATUS_OK;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 8b347db3c0..b3b751be98 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -122,6 +122,7 @@ public:
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
+ bool timestampRcvMsgs;
private:
std::string getHome();
@@ -164,6 +165,10 @@ public:
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
+ Manageable::status_t getTimestampConfig(bool& receive,
+ const ConnectionState* context);
+ Manageable::status_t setTimestampConfig(const bool receive,
+ const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -315,6 +320,7 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+ bool isTimestamping() { return config.timestampRcvMsgs; }
typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 015002a70c..7cd91ae539 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/SecureConnection.h"
#include "qpid/Url.h"
#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/SecurityLayer.h"
@@ -63,13 +64,24 @@ void ConnectionHandler::heartbeat()
handler->proxy.heartbeat();
}
+bool ConnectionHandler::handle(const framing::AMQMethodBody& method)
+{
+ //Need special handling for start-ok, in order to distinguish
+ //between null and empty response
+ if (method.isA<ConnectionStartOkBody>()) {
+ handler->startOk(dynamic_cast<const ConnectionStartOkBody&>(method));
+ return true;
+ } else {
+ return invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), method);
+ }
+}
+
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
- if (method && invoke(
- static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), *method)) {
+ if (method && handle(*method)) {
// This is a connection control frame, nothing more to do.
} else if (isOpen()) {
handler->connection.getChannel(frame.getChannel()).in(frame);
@@ -96,13 +108,10 @@ ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool
ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
proxy(c.getOutput()),
- connection(c), serverMode(!isClient), acl(0), secured(0),
+ connection(c), serverMode(!isClient), secured(0),
isOpen(false)
{
if (serverMode) {
-
- acl = connection.getBroker().getAcl();
-
FieldTable properties;
Array mechanisms(0x95);
@@ -125,13 +134,20 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow)
ConnectionHandler::Handler::~Handler() {}
-void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProperties,
- const string& mechanism,
- const string& response,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
+ const string& /*response*/,
const string& /*locale*/)
{
+ //Need special handling for start-ok, in order to distinguish
+ //between null and empty response -> should never use this method
+ assert(false);
+}
+
+void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
+{
try {
- authenticator->start(mechanism, response);
+ authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
if (agent) {
@@ -143,11 +159,14 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper
}
throw;
}
+ const framing::FieldTable& clientProperties = body.getClientProperties();
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
}
if (connection.isFederationLink()) {
+ AclModule* acl = connection.getBroker().getAcl();
+ FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
return;
@@ -308,11 +327,21 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
string response;
if (sasl.get()) {
const qpid::sys::SecuritySettings& ss = connection.getExternalSecuritySettings();
- response = sasl->start ( requestedMechanism.empty()
- ? supportedMechanismsList
- : requestedMechanism,
- & ss );
- proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+ if (sasl->start ( requestedMechanism.empty()
+ ? supportedMechanismsList
+ : requestedMechanism,
+ response,
+ & ss )) {
+ proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+ } else {
+ //response was null
+ ConnectionStartOkBody body;
+ body.setClientProperties(ft);
+ body.setMechanism(sasl->getMechanism());
+ //Don't set response, as none was given
+ body.setLocale(en_US);
+ proxy.send(body);
+ }
}
else {
response = ((char)0) + username + ((char)0) + password;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index b32167669e..05c5f00c57 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -26,8 +26,10 @@
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AMQP_AllProxy.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -57,12 +59,12 @@ class ConnectionHandler : public framing::FrameHandler
Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
- AclModule* acl;
SecureConnection* secured;
bool isOpen;
Handler(Connection& connection, bool isClient, bool isShadow=false);
~Handler();
+ void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
@@ -96,7 +98,7 @@ class ConnectionHandler : public framing::FrameHandler
};
std::auto_ptr<Handler> handler;
-
+ bool handle(const qpid::framing::AMQMethodBody& method);
public:
ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
void close(framing::connection::CloseCode code, const std::string& text);
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 5ea7143366..d13109dad1 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+ time_t now = ::time(0);
+ props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
sys::Mutex::ScopedLock l(lock);
DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 2a23a25d06..dda45d73e6 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -81,7 +81,8 @@ public:
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
- QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ /** determine msg expiration time using the TTL value if present */
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
@@ -93,6 +94,8 @@ public:
QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
void setExchange(const std::string&);
void clearApplicationHeadersFlag();
+ /** set the timestamp delivery property to the current time-of-day */
+ QPID_BROKER_EXTERN void setTimestamp();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3d878d02a8..4627b1409a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -952,6 +952,31 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri
}
}
+bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return false;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>() != 0;
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ if (s == "True") return true;
+ if (s == "true") return true;
+ if (s == "False") return false;
+ if (s == "false") return false;
+ try {
+ return boost::lexical_cast<bool>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
+ return false;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
+ return false;
+ }
+}
+
void Queue::configure(const FieldTable& _settings)
{
settings = _settings;
@@ -983,7 +1008,7 @@ void Queue::configureImpl(const FieldTable& _settings)
}
//set this regardless of owner to allow use of no-local with exclusive consumers also
- noLocal = _settings.get(qpidNoLocal);
+ noLocal = getBoolSetting(_settings, qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
@@ -991,11 +1016,11 @@ void Queue::configureImpl(const FieldTable& _settings)
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
- } else if (_settings.get(qpidLastValueQueue)) {
+ } else if (getBoolSetting(_settings, qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
@@ -1015,7 +1040,7 @@ void Queue::configureImpl(const FieldTable& _settings)
}
}
- persistLastNode= _settings.get(qpidPersistLastNode);
+ persistLastNode = getBoolSetting(_settings, qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 12a13ccfe6..d7adbd68ab 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -30,6 +30,7 @@
#include <boost/format.hpp>
#if HAVE_SASL
+#include <sys/stat.h>
#include <sasl/sasl.h>
#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
using qpid::sys::cyrus::CyrusSecurityLayer;
@@ -57,7 +58,7 @@ public:
NullAuthenticator(Connection& connection, bool encrypt);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string& response);
+ void start(const std::string& mechanism, const std::string* response);
void step(const std::string&) {}
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -81,7 +82,7 @@ public:
~CyrusAuthenticator();
void init();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string& response);
+ void start(const std::string& mechanism, const std::string* response);
void step(const std::string& response);
void getError(std::string& error);
void getUid(std::string& uid) { getUsername(uid); }
@@ -98,11 +99,33 @@ void SaslAuthenticator::init(const std::string& saslName, std::string const & sa
// Check if we have a version of SASL that supports sasl_set_path()
#if (SASL_VERSION_FULL >= ((2<<16)|(1<<8)|22))
// If we are not given a sasl path, do nothing and allow the default to be used.
- if ( ! saslConfigPath.empty() ) {
- int code = sasl_set_path(SASL_PATH_TYPE_CONFIG,
- const_cast<char *>(saslConfigPath.c_str()));
+ if ( saslConfigPath.empty() ) {
+ QPID_LOG ( info, "SASL: no config path set - using default." );
+ }
+ else {
+ struct stat st;
+
+ // Make sure the directory exists and we can read up to it.
+ if ( ::stat ( saslConfigPath.c_str(), & st) ) {
+ // Note: not using strerror() here because I think its messages are a little too hazy.
+ if ( errno == ENOENT )
+ throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: no such directory: " << saslConfigPath ) );
+ if ( errno == EACCES )
+ throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot read parent of: " << saslConfigPath ) );
+ // catch-all stat failure
+ throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot stat: " << saslConfigPath ) );
+ }
+
+ // Make sure the directory is readable.
+ if ( ::access ( saslConfigPath.c_str(), R_OK ) ) {
+ throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: directory not readable:" << saslConfigPath ) );
+ }
+
+ // This shouldn't fail now, but check anyway.
+ int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, const_cast<char *>(saslConfigPath.c_str()));
if(SASL_OK != code)
throw Exception(QPID_MSG("SASL: sasl_set_path failed [" << code << "] " ));
+
QPID_LOG(info, "SASL: config path set to " << saslConfigPath );
}
#endif
@@ -164,7 +187,7 @@ void NullAuthenticator::getMechanisms(Array& mechanisms)
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing
}
-void NullAuthenticator::start(const string& mechanism, const string& response)
+void NullAuthenticator::start(const string& mechanism, const string* response)
{
if (encrypt) {
#if HAVE_SASL
@@ -180,16 +203,16 @@ void NullAuthenticator::start(const string& mechanism, const string& response)
}
}
if (mechanism == "PLAIN") { // Old behavior
- if (response.size() > 0) {
+ if (response && response->size() > 0) {
string uid;
- string::size_type i = response.find((char)0);
- if (i == 0 && response.size() > 1) {
+ string::size_type i = response->find((char)0);
+ if (i == 0 && response->size() > 1) {
//no authorization id; use authentication id
- i = response.find((char)0, 1);
- if (i != string::npos) uid = response.substr(1, i-1);
+ i = response->find((char)0, 1);
+ if (i != string::npos) uid = response->substr(1, i-1);
} else if (i != string::npos) {
//authorization id is first null delimited field
- uid = response.substr(0, i);
+ uid = response->substr(0, i);
}//else not a valid SASL PLAIN response, throw error?
if (!uid.empty()) {
//append realm if it has not already been added
@@ -376,7 +399,7 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
}
}
-void CyrusAuthenticator::start(const string& mechanism, const string& response)
+void CyrusAuthenticator::start(const string& mechanism, const string* response)
{
const char *challenge;
unsigned int challenge_len;
@@ -385,7 +408,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string& response)
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
int code = sasl_server_start(sasl_conn,
mechanism.c_str(),
- response.size() ? response.c_str() : 0, response.length(),
+ (response ? response->c_str() : 0), (response ? response->size() : 0),
&challenge, &challenge_len);
processAuthenticationStep(code, challenge, challenge_len);
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
index cfbe1a0cd1..4e5d43214c 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
@@ -41,7 +41,7 @@ class SaslAuthenticator
public:
virtual ~SaslAuthenticator() {}
virtual void getMechanisms(framing::Array& mechanisms) = 0;
- virtual void start(const std::string& mechanism, const std::string& response) = 0;
+ virtual void start(const std::string& mechanism, const std::string* response) = 0;
virtual void step(const std::string& response) = 0;
virtual void getUid(std::string&) {}
virtual bool getUsername(std::string&) { return false; };
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 94d0cc87f7..fbcb21eab9 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -75,9 +75,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
closeComplete(false)
-{
- acl = getSession().getBroker().getAcl();
-}
+{}
SemanticState::~SemanticState() {
closed();
@@ -348,7 +346,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- record.accept( 0 /*no ctxt*/ );
+ queue->dequeue(0 /*ctxt*/, msg);
+ record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -471,7 +470,7 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+ msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
@@ -487,6 +486,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
}
+ AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 12ccc75f11..6d88dd56d9 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -165,7 +165,6 @@ class SemanticState : private boost::noncopyable {
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
- AclModule* acl;
const bool authMsg;
const std::string userID;
const std::string userName;
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index ddd6ae3f5b..1ab17e9893 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
header.setEof(false);
msg->getFrames().append(header);
}
+ if (broker.isTimestamping())
+ msg->setTimestamp();
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
semanticState.handle(msg);
diff --git a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
index d26b370632..2acc09cded 100644
--- a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
@@ -42,7 +42,7 @@ public:
NullAuthenticator(Connection& connection);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string& response);
+ void start(const std::string& mechanism, const std::string* response);
void step(const std::string&) {}
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -57,7 +57,7 @@ public:
SspiAuthenticator(Connection& connection);
~SspiAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string& response);
+ void start(const std::string& mechanism, const std::string* response);
void step(const std::string& response);
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -96,12 +96,12 @@ void NullAuthenticator::getMechanisms(Array& mechanisms)
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));
}
-void NullAuthenticator::start(const string& mechanism, const string& response)
+void NullAuthenticator::start(const string& mechanism, const string* response)
{
QPID_LOG(warning, "SASL: No Authentication Performed");
if (mechanism == "PLAIN") { // Old behavior
- if (response.size() > 0 && response[0] == (char) 0) {
- string temp = response.substr(1);
+ if (response && response->size() > 0 && (*response).c_str()[0] == (char) 0) {
+ string temp = response->substr(1);
string::size_type i = temp.find((char)0);
string uid = temp.substr(0, i);
string pwd = temp.substr(i + 1);
@@ -139,7 +139,7 @@ void SspiAuthenticator::getMechanisms(Array& mechanisms)
QPID_LOG(info, "SASL: Mechanism list: ANONYMOUS PLAIN");
}
-void SspiAuthenticator::start(const string& mechanism, const string& response)
+void SspiAuthenticator::start(const string& mechanism, const string* response)
{
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
if (mechanism == "ANONYMOUS") {
@@ -152,14 +152,14 @@ void SspiAuthenticator::start(const string& mechanism, const string& response)
// PLAIN's response is composed of 3 strings separated by 0 bytes:
// authorization id, authentication id (user), clear-text password.
- if (response.size() == 0)
+ if (!response || response->size() == 0)
throw ConnectionForcedException("Authentication failed");
- string::size_type i = response.find((char)0);
- string auth = response.substr(0, i);
- string::size_type j = response.find((char)0, i+1);
- string uid = response.substr(i+1, j-1);
- string pwd = response.substr(j+1);
+ string::size_type i = response->find((char)0);
+ string auth = response->substr(0, i);
+ string::size_type j = response->find((char)0, i+1);
+ string uid = response->substr(i+1, j-1);
+ string pwd = response->substr(j+1);
string dot(".");
int error = 0;
if (!LogonUser(const_cast<char*>(uid.c_str()),
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index 801fe38051..ab0d8e0700 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -254,8 +254,18 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
}
if (sasl.get()) {
- string response = sasl->start(join(mechlist), getSecuritySettings ? getSecuritySettings() : 0);
- proxy.startOk(properties, sasl->getMechanism(), response, locale);
+ string response;
+ if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) {
+ proxy.startOk(properties, sasl->getMechanism(), response, locale);
+ } else {
+ //response was null
+ ConnectionStartOkBody body;
+ body.setClientProperties(properties);
+ body.setMechanism(sasl->getMechanism());
+ //Don't set response, as none was given
+ body.setLocale(locale);
+ proxy.send(body);
+ }
} else {
//TODO: verify that desired mechanism and locale are supported
string response = ((char)0) + username + ((char)0) + password;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 5cf20c92eb..3badaf40ba 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject");
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, others?
+ // routing-key, timestamp, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
+ if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+ message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
index d1ae762f1b..53d825771b 100644
--- a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
+++ b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
@@ -71,7 +71,7 @@ class WindowsSasl : public Sasl
public:
WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int );
~WindowsSasl();
- std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
+ bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl()
{
}
-std::string WindowsSasl::start(const std::string& mechanisms,
- const SecuritySettings* /*externalSettings*/)
+bool WindowsSasl::start(const std::string& mechanisms, std::string& response,
+ const SecuritySettings* /*externalSettings*/)
{
QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
@@ -142,15 +142,15 @@ std::string WindowsSasl::start(const std::string& mechanisms,
if (!haveAnon && !havePlain)
throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism"));
- std::string resp = "";
if (havePlain) {
mechanism = PLAIN;
- resp = ((char)0) + settings.username + ((char)0) + settings.password;
+ response = ((char)0) + settings.username + ((char)0) + settings.password;
}
else {
mechanism = ANONYMOUS;
+ response = "";
}
- return resp;
+ return true;
}
std::string WindowsSasl::step(const std::string& /*challenge*/)
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 7432fbbc33..e6e3de64f2 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -278,7 +278,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastBroker(false),
updateRetracted(false),
updateClosed(false),
- error(*this)
+ error(*this),
+ acl(0)
{
broker.setInCluster(true);
@@ -856,6 +857,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
else if (updatee == self && url) {
assert(state == JOINER);
state = UPDATEE;
+ acl = broker.getAcl();
+ broker.setAcl(0); // Disable ACL during update
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
@@ -956,6 +959,7 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ broker.setAcl(acl); // Restore ACL
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index da5781b7a9..ccec4948e6 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -56,6 +56,7 @@ namespace qpid {
namespace broker {
class Message;
+class AclModule;
}
namespace framing {
@@ -312,6 +313,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
sys::Timer clockTimer;
sys::AbsTime clusterTime;
sys::Duration clusterTimeOffset;
+ broker::AclModule* acl;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend struct ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp
index 945c0a4d24..b3d1e2e1e4 100644
--- a/qpid/cpp/src/qpid/framing/Uuid.cpp
+++ b/qpid/cpp/src/qpid/framing/Uuid.cpp
@@ -59,7 +59,9 @@ void Uuid::clear() {
// Force int 0/!0 to false/true; avoids compile warnings.
bool Uuid::isNull() const {
- return !!uuid_is_null(data());
+ // This const cast is for Solaris which has a
+ // uuid_is_null that takes a non const argument
+ return !!uuid_is_null(const_cast<uint8_t*>(data()));
}
void Uuid::encode(Buffer& buf) const {
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 50fdc82ee0..5799a1adca 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data,
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
+ msg->computeExpiration(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
msg->setIsManagementMessage(true);
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index 25f1c5fb9d..defec4879c 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -95,9 +95,11 @@ private:
/** Create socket */
void createSocket(const SocketAddress&) const;
+public:
/** Construct socket with existing handle */
Socket(IOHandlePrivate*);
+protected:
mutable std::string localname;
mutable std::string peername;
mutable bool nonblocking;
diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
index 471a0cef60..ab15785492 100644
--- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp
@@ -25,6 +25,8 @@
#include "qpid/sys/ssl/check.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/broker/Broker.h"
@@ -37,15 +39,19 @@
namespace qpid {
namespace sys {
+using namespace qpid::sys::ssl;
+
struct SslServerOptions : ssl::SslOptions
{
uint16_t port;
bool clientAuth;
bool nodict;
+ bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
- nodict(false)
+ nodict(false),
+ multiplex(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -56,15 +62,20 @@ struct SslServerOptions : ssl::SslOptions
}
};
-class SslProtocolFactory : public ProtocolFactory {
+template <class T>
+class SslProtocolFactoryTmpl : public ProtocolFactory {
+ private:
+
+ typedef SslAcceptorTmpl<T> SslAcceptor;
+
const bool tcpNoDelay;
- qpid::sys::ssl::SslSocket listener;
+ T listener;
const uint16_t listeningPort;
- std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+ std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -74,10 +85,14 @@ class SslProtocolFactory : public ProtocolFactory {
bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*,
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
};
+typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
+typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
+
+
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
SslServerOptions options;
@@ -86,10 +101,26 @@ static struct SslPlugin : public Plugin {
~SslPlugin() { ssl::shutdownNSS(); }
- void earlyInitialize(Target&) {
+ void earlyInitialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && !options.certDbPath.empty()) {
+ const broker::Broker::Options& opts = broker->getOptions();
+
+ if (opts.port == options.port && // AMQP & AMQPS ports are the same
+ opts.port != 0) {
+ // The presence of this option is used to signal to the TCP
+ // plugin not to start listening on the shared port. The actual
+ // value cannot be configured through the command line or config
+ // file (other than by setting the ports to the same value)
+ // because we are only adding it after option parsing.
+ options.multiplex = true;
+ options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+ }
+ }
}
void initialize(Target& target) {
+ QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
@@ -100,10 +131,18 @@ static struct SslPlugin : public Plugin {
ssl::initNSS(options, true);
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+
+ ProtocolFactory::shared_ptr protocol(options.multiplex ?
+ static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)) :
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)));
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP port " <<
+ protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -113,13 +152,15 @@ static struct SslPlugin : public Plugin {
}
} sslPlugin;
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
+template <class T>
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient) {
+void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
+ ConnectionCodec::Factory* f, bool isClient,
+ bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -127,8 +168,10 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
+ if (isClient) {
async->setClient();
+ }
+
qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
@@ -141,19 +184,64 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys:
aio->start(poller);
}
-uint16_t SslProtocolFactory::getPort() const {
+template <>
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+}
+
+template <class T>
+uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
+template <class T>
+void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
- new qpid::sys::ssl::SslAcceptor(listener,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ new SslAcceptor(listener,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established,
+ this, poller, _1, fact, false)));
acceptor->start(poller);
}
-void SslProtocolFactory::connect(
+template <>
+void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ if (sslSock) {
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ return;
+ }
+
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+
+ if (tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ if (isClient) {
+ async->setClient();
+ }
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+template <class T>
+void SslProtocolFactoryTmpl<T>::connect(
Poller::shared_ptr poller,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
@@ -166,9 +254,9 @@ void SslProtocolFactory::connect(
// is no longer needed.
qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new qpid::sys::ssl::SslConnector (*socket, poller, host, port,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true),
- failed);
+ new SslConnector(*socket, poller, host, port,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
+ failed);
}
namespace
@@ -176,6 +264,7 @@ namespace
const std::string SSL = "ssl";
}
+template <>
bool SslProtocolFactory::supports(const std::string& capability)
{
std::string s = capability;
@@ -183,4 +272,12 @@ bool SslProtocolFactory::supports(const std::string& capability)
return s == SSL;
}
+template <>
+bool SslMuxProtocolFactory::supports(const std::string& capability)
+{
+ std::string s = capability;
+ transform(s.begin(), s.end(), s.begin(), tolower);
+ return s == SSL || s == "tcp";
+}
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 85d8c1db87..8a99d8db71 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -43,7 +43,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
uint16_t listeningPort;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -57,6 +57,20 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
+static bool sslMultiplexEnabled(void)
+{
+ Options o;
+ Plugin::addOptions(o);
+
+ if (o.find_nothrow("ssl-multiplex", false)) {
+ // This option is added by the SSL plugin when the SSL port
+ // is configured to be the same as the main port.
+ QPID_LOG(notice, "SSL multiplexing enabled");
+ return true;
+ }
+ return false;
+}
+
// Static instance to initialise plugin
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
@@ -67,20 +81,31 @@ static class TCPIOPlugin : public Plugin {
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
+
+ // Check for SSL on the same port
+ bool shouldListen = !sslMultiplexEnabled();
+
ProtocolFactory::shared_ptr protocolt(
new AsynchIOProtocolFactory(
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+ opts.tcpNoDelay,
+ shouldListen));
+ if (shouldListen) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+ }
broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
tcpNoDelay(nodelay)
{
+ if (!shouldListen) {
+ return;
+ }
+
SocketAddress sa(host, port);
// We must have at least one resolved address
diff --git a/qpid/cpp/src/qpid/sys/alloca.h b/qpid/cpp/src/qpid/sys/alloca.h
index 0f58920908..b3f59b7c3f 100644
--- a/qpid/cpp/src/qpid/sys/alloca.h
+++ b/qpid/cpp/src/qpid/sys/alloca.h
@@ -34,9 +34,9 @@
# endif
# define alloca _alloca
# endif
-# if !defined _WINDOWS && !defined WIN32
-# include <alloca.h>
-# endif
+#endif
+#if !defined _WINDOWS && !defined WIN32
+# include <alloca.h>
#endif
#endif /*!QPID_SYS_ALLOCA_H*/
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
index 734ebb483a..4a59819183 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -68,29 +68,33 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
* Asynch Acceptor
*/
-SslAcceptor::SslAcceptor(const SslSocket& s, Callback callback) :
+template <class T>
+SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0),
+ handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
ignoreSigpipe();
}
-SslAcceptor::~SslAcceptor()
+template <class T>
+SslAcceptorTmpl<T>::~SslAcceptorTmpl()
{
handle.stopWatch();
}
-void SslAcceptor::start(Poller::shared_ptr poller) {
+template <class T>
+void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
/*
* We keep on accepting as long as there is something to accept
*/
-void SslAcceptor::readable(DispatchHandle& h) {
- SslSocket* s;
+template <class T>
+void SslAcceptorTmpl<T>::readable(DispatchHandle& h) {
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
@@ -110,6 +114,10 @@ void SslAcceptor::readable(DispatchHandle& h) {
h.rewatch();
}
+// Explicitly instantiate the templates we need
+template class SslAcceptorTmpl<SslSocket>;
+template class SslAcceptorTmpl<SslMuxSocket>;
+
/*
* Asynch Connector
*/
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.h b/qpid/cpp/src/qpid/sys/ssl/SslIo.h
index 8785852c24..c980d73831 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.h
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.h
@@ -29,26 +29,30 @@
namespace qpid {
namespace sys {
+
+class Socket;
+
namespace ssl {
-
+
class SslSocket;
/*
* Asynchronous ssl acceptor: accepts connections then does a callback
* with the accepted fd
*/
-class SslAcceptor {
+template <class T>
+class SslAcceptorTmpl {
public:
- typedef boost::function1<void, const SslSocket&> Callback;
+ typedef boost::function1<void, const Socket&> Callback;
private:
Callback acceptedCallback;
qpid::sys::DispatchHandle handle;
- const SslSocket& socket;
+ const T& socket;
public:
- SslAcceptor(const SslSocket& s, Callback callback);
- ~SslAcceptor();
+ SslAcceptorTmpl(const T& s, Callback callback);
+ ~SslAcceptorTmpl();
void start(qpid::sys::Poller::shared_ptr poller);
private:
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
index f7483a220c..30234bb686 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -25,11 +25,13 @@
#include "qpid/Exception.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/errno.h>
+#include <poll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
@@ -50,36 +52,6 @@ namespace sys {
namespace ssl {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
-{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
-
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
-}
-
std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
@@ -132,7 +104,7 @@ std::string getDomainFromSubject(std::string subject)
}
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+SslSocket::SslSocket() : socket(0), prototype(0)
{
impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
@@ -144,7 +116,7 @@ SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0
* returned from accept. Because we use posix accept rather than
* PR_Accept, we have to reset the handshake.
*/
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
+SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0)
{
socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
NSS_CHECK(SSL_ResetHandshake(socket, true));
@@ -238,6 +210,7 @@ int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, b
SslSocket* SslSocket::accept() const
{
+ QPID_LOG(trace, "Accepting SSL connection.");
int afd = ::accept(impl->fd, 0, 0);
if ( afd >= 0) {
return new SslSocket(new IOHandlePrivate(afd), prototype);
@@ -248,36 +221,109 @@ SslSocket* SslSocket::accept() const
}
}
-int SslSocket::read(void *buf, size_t count) const
-{
- return PR_Read(socket, buf, count);
-}
+#define SSL_STREAM_MAX_WAIT_ms 20
+#define SSL_STREAM_MAX_RETRIES 2
-int SslSocket::write(const void *buf, size_t count) const
-{
- return PR_Write(socket, buf, count);
-}
+static bool isSslStream(int afd) {
+ int retries = SSL_STREAM_MAX_RETRIES;
+ unsigned char buf[5] = {};
-std::string SslSocket::getSockname() const
-{
- return getName(impl->fd, true);
+ do {
+ struct pollfd fd = {afd, POLLIN, 0};
+
+ /*
+ * Note that this is blocking the accept thread, so connections that
+ * send no data can limit the rate at which we can accept new
+ * connections.
+ */
+ if (::poll(&fd, 1, SSL_STREAM_MAX_WAIT_ms) > 0) {
+ errno = 0;
+ int result = recv(afd, buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
+ if (result == sizeof(buf)) {
+ break;
+ }
+ if (errno && errno != EAGAIN) {
+ int err = errno;
+ ::close(afd);
+ throw QPID_POSIX_ERROR(err);
+ }
+ }
+ } while (retries-- > 0);
+
+ if (retries < 0) {
+ return false;
+ }
+
+ /*
+ * SSLv2 Client Hello format
+ * http://www.mozilla.org/projects/security/pki/nss/ssl/draft02.html
+ *
+ * Bytes 0-1: RECORD-LENGTH
+ * Byte 2: MSG-CLIENT-HELLO (1)
+ * Byte 3: CLIENT-VERSION-MSB
+ * Byte 4: CLIENT-VERSION-LSB
+ *
+ * Allowed versions:
+ * 2.0 - SSLv2
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ *
+ * The version sent in the Client-Hello is the latest version supported by
+ * the client. NSS may send version 3.x in an SSLv2 header for
+ * maximum compatibility.
+ */
+ bool isSSL2Handshake = buf[2] == 1 && // MSG-CLIENT-HELLO
+ ((buf[3] == 3 && buf[4] <= 3) || // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+ (buf[3] == 2 && buf[4] == 0)); // SSL 2
+
+ /*
+ * SSLv3/TLS Client Hello format
+ * RFC 2246
+ *
+ * Byte 0: ContentType (handshake - 22)
+ * Bytes 1-2: ProtocolVersion {major, minor}
+ *
+ * Allowed versions:
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ */
+ bool isSSL3Handshake = buf[0] == 22 && // handshake
+ (buf[1] == 3 && buf[2] <= 3); // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+
+ return isSSL2Handshake || isSSL3Handshake;
}
-std::string SslSocket::getPeername() const
+Socket* SslMuxSocket::accept() const
{
- return getName(impl->fd, false);
+ int afd = ::accept(impl->fd, 0, 0);
+ if (afd >= 0) {
+ QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
+ if (isSslStream(afd)) {
+ QPID_LOG(trace, "Accepted SSL connection.");
+ return new SslSocket(new IOHandlePrivate(afd), prototype);
+ } else {
+ QPID_LOG(trace, "Accepted Plaintext connection.");
+ return new Socket(new IOHandlePrivate(afd));
+ }
+ } else if (errno == EAGAIN) {
+ return 0;
+ } else {
+ throw QPID_POSIX_ERROR(errno);
+ }
}
-std::string SslSocket::getPeerAddress() const
+int SslSocket::read(void *buf, size_t count) const
{
- if (!connectname.empty())
- return connectname;
- return getName(impl->fd, false, true);
+ return PR_Read(socket, buf, count);
}
-std::string SslSocket::getLocalAddress() const
+int SslSocket::write(const void *buf, size_t count) const
{
- return getName(impl->fd, true, true);
+ return PR_Write(socket, buf, count);
}
uint16_t SslSocket::getLocalPort() const
@@ -290,17 +336,6 @@ uint16_t SslSocket::getRemotePort() const
return atoi(getService(impl->fd, true).c_str());
}
-int SslSocket::getError() const
-{
- int result;
- socklen_t rSize = sizeof (result);
-
- if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return result;
-}
-
void SslSocket::setTcpNoDelay(bool nodelay) const
{
if (nodelay) {
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
index 993859495b..eabadcbe23 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Socket.h"
#include <nspr.h>
#include <string>
@@ -36,7 +37,7 @@ class Duration;
namespace ssl {
-class SslSocket : public qpid::sys::IOHandle
+class SslSocket : public qpid::sys::Socket
{
public:
/** Create a socket wrapper for descriptor. */
@@ -75,45 +76,13 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
- /**
- * Returns an address (host and port) for the remote end of the
- * socket
- */
- std::string getPeerAddress() const;
- /**
- * Returns an address (host and port) for the local end of the
- * socket
- */
- std::string getLocalAddress() const;
-
- /**
- * Returns the full address of the connection: local and remote host and port.
- */
- std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
- /**
- * Returns the error code stored in the socket. This may be used
- * to determine the result of a non-blocking connect.
- */
- int getError() const;
-
int getKeyLen() const;
std::string getClientAuthId() const;
-private:
+protected:
mutable std::string connectname;
mutable PRFileDesc* socket;
std::string certname;
@@ -126,6 +95,13 @@ private:
mutable PRFileDesc* prototype;
SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
+ friend class SslMuxSocket;
+};
+
+class SslMuxSocket : public SslSocket
+{
+public:
+ Socket* accept() const;
};
}}}
diff --git a/qpid/cpp/src/qpid/sys/unordered_map.h b/qpid/cpp/src/qpid/sys/unordered_map.h
new file mode 100644
index 0000000000..5f7f9567c5
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/unordered_map.h
@@ -0,0 +1,35 @@
+#ifndef _sys_unordered_map_h
+#define _sys_unordered_map_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// unordered_map include path is platform specific
+
+#ifdef _MSC_VER
+# include <unordered_map>
+#else
+# include <tr1/unordered_map>
+#endif /* _MSC_VER */
+namespace qpid {
+namespace sys {
+ using std::tr1::unordered_map;
+}}
+
+
+#endif /* _sys_unordered_map_h */
diff --git a/qpid/cpp/src/qpid/types/Uuid.cpp b/qpid/cpp/src/qpid/types/Uuid.cpp
index 9face4e5d2..9862fa8946 100644
--- a/qpid/cpp/src/qpid/types/Uuid.cpp
+++ b/qpid/cpp/src/qpid/types/Uuid.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/types/Uuid.h"
#include "qpid/sys/uuid.h"
+#include "qpid/sys/IntegerTypes.h"
#include <sstream>
#include <iostream>
#include <string.h>
@@ -71,7 +72,8 @@ void Uuid::clear()
// Force int 0/!0 to false/true; avoids compile warnings.
bool Uuid::isNull() const
{
- return !!uuid_is_null(bytes);
+ // This const cast is for Solaris which has non const arguments
+ return !!uuid_is_null(const_cast<uint8_t*>(bytes));
}
Uuid::operator bool() const { return !isNull(); }
@@ -86,7 +88,8 @@ const unsigned char* Uuid::data() const
bool operator==(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) == 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) == 0;
}
bool operator!=(const Uuid& a, const Uuid& b)
@@ -96,22 +99,26 @@ bool operator!=(const Uuid& a, const Uuid& b)
bool operator<(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) < 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) < 0;
}
bool operator>(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) > 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) > 0;
}
bool operator<=(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) <= 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) <= 0;
}
bool operator>=(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) >= 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) >= 0;
}
ostream& operator<<(ostream& out, Uuid uuid)
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index a7c1dbe8a6..a0e329ca9d 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -31,7 +31,8 @@ using namespace std;
auto_ptr<QpiddOptions> options;
-int main(int argc, char* argv[])
+// Broker real entry; various system-invoked entrypoints call here.
+int run_broker(int argc, char *argv[], bool hidden)
{
try
{
@@ -43,6 +44,8 @@ int main(int argc, char* argv[])
// module-supplied options.
try {
bootOptions.parse (argc, argv, bootOptions.common.config, true);
+ if (hidden)
+ bootOptions.log.sinkOptions->detached();
qpid::log::Logger::instance().configure(bootOptions.log);
} catch (const std::exception& e) {
// Couldn't configure logging so write the message direct to stderr.
diff --git a/qpid/cpp/src/qpidd.h b/qpid/cpp/src/qpidd.h
index c702270e80..a3150a2737 100644
--- a/qpid/cpp/src/qpidd.h
+++ b/qpid/cpp/src/qpidd.h
@@ -67,4 +67,7 @@ public:
int execute (QpiddOptions *options);
};
+// Broker real entry; various system-invoked entrypoints call here.
+int run_broker(int argc, char *argv[], bool hidden = false);
+
#endif /*!QPID_H*/
diff --git a/qpid/cpp/src/tests/BrokerOptions.cpp b/qpid/cpp/src/tests/BrokerOptions.cpp
new file mode 100644
index 0000000000..b36d96916a
--- /dev/null
+++ b/qpid/cpp/src/tests/BrokerOptions.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Unit tests for various broker configuration options **/
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "MessagingFixture.h"
+
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace qpid;
+
+QPID_AUTO_TEST_CASE(testDisabledTimestamp)
+{
+ // by default, there should be no timestamp added by the broker
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("hi");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end());
+}
+
+QPID_AUTO_TEST_CASE(testEnabledTimestamp)
+{
+ // when enabled, the 0.10 timestamp is added by the broker
+ Broker::Options opts;
+ opts.timestampRcvMsgs = true;
+ MessagingFixture fix(opts, true);
+
+ Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+ messaging::Message msg("one");
+ sender.send(msg);
+
+ Receiver receiver = fix.session.createReceiver("test-q");
+ messaging::Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+ Variant::Map props = in.getProperties();
+ BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end());
+ BOOST_CHECK(props["x-amqp-0-10.timestamp"]);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 78ac6db5f1..3c9ca1b70f 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -75,7 +75,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
MessagingThreadTests.cpp \
MessagingFixture.h \
ClientSessionTest.cpp \
- BrokerFixture.h SocketProxy.h \
+ BrokerFixture.h \
exception_test.cpp \
RefCounted.cpp \
SessionState.cpp logging.cpp \
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerOptions.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -349,7 +350,8 @@ EXTRA_DIST += \
stop_broker.ps1 \
topictest.ps1 \
run_queue_flow_limit_tests \
- run_msg_group_tests
+ run_msg_group_tests \
+ ipv6_test
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -362,7 +364,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
LONG_TESTS+=start_broker \
fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
- run_msg_groups_tests_soak \
+ run_msg_group_tests_soak \
stop_broker \
run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
@@ -377,7 +379,7 @@ EXTRA_DIST+= \
reliable_replication_test \
federated_cluster_test_with_node_failure \
sasl_test_setup.sh \
- run_msg_groups_tests_soak
+ run_msg_group_tests_soak
check-long:
$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 7bf061ff54..aaa2721021 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -258,26 +258,26 @@ QPID_AUTO_TEST_CASE(testBound){
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
client::QueueOptions args;
- args.setPersistLastNode();
+ args.setPersistLastNode();
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
- //change mode
- queue->setLastNodeFailure();
+ //change mode
+ queue->setLastNodeFailure();
- //enqueue 1 message
+ //enqueue 1 message
queue->deliver(msg3);
- //check all have persistent ids.
+ //check all have persistent ids.
BOOST_CHECK(msg1->isPersistent());
BOOST_CHECK(msg2->isPersistent());
BOOST_CHECK(msg3->isPersistent());
@@ -287,13 +287,13 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
QPID_AUTO_TEST_CASE(testSeek){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
@@ -313,13 +313,13 @@ QPID_AUTO_TEST_CASE(testSeek){
QPID_AUTO_TEST_CASE(testSearch){
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
intrusive_ptr<Message> msg3 = create_message("e", "C");
- //enqueue 2 messages
+ //enqueue 2 messages
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
@@ -424,10 +424,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
client::QueueOptions args;
// set queue mode
- args.setOrdering(client::LVQ);
+ args.setOrdering(client::LVQ);
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
intrusive_ptr<Message> msg1 = create_message("e", "A");
intrusive_ptr<Message> msg2 = create_message("e", "B");
@@ -438,16 +438,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
//set deliever match for LVQ a,b,c,a
string key;
- args.getLVQKey(key);
+ args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
- //enqueue 4 message
+ //enqueue 4 message
queue->deliver(msg1);
queue->deliver(msg2);
queue->deliver(msg3);
@@ -467,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
intrusive_ptr<Message> msg5 = create_message("e", "A");
intrusive_ptr<Message> msg6 = create_message("e", "B");
intrusive_ptr<Message> msg7 = create_message("e", "C");
- msg5->insertCustomProperty(key,"a");
- msg6->insertCustomProperty(key,"b");
- msg7->insertCustomProperty(key,"c");
+ msg5->insertCustomProperty(key,"a");
+ msg6->insertCustomProperty(key,"b");
+ msg7->insertCustomProperty(key,"c");
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
@@ -652,7 +652,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
msg1->insertCustomProperty(key,"a");
msg2->insertCustomProperty(key,"a");
- // 3
+ // 3
queue1->deliver(msg1);
// 4
queue1->setLastNodeFailure();
@@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
{
for (uint i = 0; i < count; i++) {
intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->setTimestamp(new broker::ExpiryPolicy);
+ m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 5e9a150d8f..65d5242e51 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -1030,6 +1030,64 @@ class ACLTests(TestBase010):
if (403 == e.args[0].error_code):
self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
+ #=====================================
+ # ACL broker configuration tests
+ #=====================================
+
+ def test_broker_timestamp_config(self):
+ """
+ Test ACL control of the broker timestamp configuration
+ """
+ aclf = self.get_acl_file()
+ # enable lots of stuff to allow QMF to work
+ aclf.write('acl allow all create exchange\n')
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all publish exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ # this should let bob access the timestamp configuration
+ aclf.write('acl allow bob@QPID access broker\n')
+ aclf.write('acl allow admin@QPID all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result.text.find("format error",0,len(result.text)) != -1):
+ self.fail(result)
+
+ ts = None
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+ ts = bob.get_timestamp_cfg() #should work
+ bob.set_timestamp_cfg(ts); #should work
+
+ obo = BrokerAdmin(self.config.broker, "obo", "obo")
+ try:
+ ts = obo.get_timestamp_cfg() #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ try:
+ obo.set_timestamp_cfg(ts) #should fail
+ failed = False
+ except Exception, e:
+ failed = True
+ self.assertEqual(7,e.args[0]["error_code"])
+ assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ assert(failed)
+
+ admin = BrokerAdmin(self.config.broker, "admin", "admin")
+ ts = admin.get_timestamp_cfg() #should pass
+ admin.set_timestamp_cfg(ts) #should pass
+
+
class BrokerAdmin:
def __init__(self, broker, username=None, password=None):
self.connection = qpid.messaging.Connection(broker)
@@ -1075,3 +1133,9 @@ class BrokerAdmin:
def delete_queue(self, name):
self.invoke("delete", {"type": "queue", "name":name})
+
+ def get_timestamp_cfg(self):
+ return self.invoke("getTimestampConfig", {})
+
+ def set_timestamp_cfg(self, receive):
+ return self.invoke("getTimestampConfig", {"receive":receive})
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index d217f9fbde..0e80e06d34 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -115,63 +115,22 @@ class ShortTests(BrokerTest):
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
aclf.write("""
-acl deny zag@QPID create queue
-acl allow all all
+acl allow zig@QPID all all
+acl deny all all
""")
aclf.close()
- cluster = self.cluster(2, args=["--auth", "yes",
+ cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
"--acl-file", acl])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always}")
- c.close()
- c = cluster[1].connect(username="zig", password="zig")
- c.session().receiver("ziggy;{assert:always}")
+ c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
c.close()
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Valid user, bad password
- try:
- cluster[0].connect(username="zig", password="foo").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Bad user ID
- try:
- cluster[0].connect(username="foo", password="bar").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Action disallowed by ACL
- c = cluster[0].connect(username="zag", password="zag")
- try:
- s = c.session()
- s.sender("zaggy;{create:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.UnauthorizedAccess: pass
- # make sure the queue was not created at the other node.
- c = cluster[0].connect(username="zag", password="zag")
- try:
- s = c.session()
- s.sender("zaggy;{assert:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.NotFound: pass
-
- def test_sasl_join(self):
- """Verify SASL authentication between brokers when joining a cluster."""
- sasl_config=os.path.join(self.rootdir, "sasl_config")
+ cluster.start() # Start second node.
- # Valid user/password, ensure queue is created.
- c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always}")
- c.close()
+ # Check queue is created on second node.
c = cluster[1].connect(username="zig", password="zig")
c.session().receiver("ziggy;{assert:always}")
c.close()
@@ -200,49 +159,7 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.UnauthorizedAccess: pass
# make sure the queue was not created at the other node.
- c = cluster[0].connect(username="zag", password="zag")
- try:
- s = c.session()
- s.sender("zaggy;{assert:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.NotFound: pass
-
- def test_sasl_join(self):
- """Verify SASL authentication between brokers when joining a cluster."""
- # Valid user/password, ensure queue is created.
- c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always}")
- c.close()
c = cluster[1].connect(username="zig", password="zig")
- c.session().receiver("ziggy;{assert:always}")
- c.close()
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Valid user, bad password
- try:
- cluster[0].connect(username="zig", password="foo").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Bad user ID
- try:
- cluster[0].connect(username="foo", password="bar").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Action disallowed by ACL
- c = cluster[0].connect(username="zag", password="zag")
- try:
- s = c.session()
- s.sender("zaggy;{create:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.UnauthorizedAccess: pass
- # make sure the queue was not created at the other node.
- c = cluster[0].connect(username="zag", password="zag")
try:
s = c.session()
s.sender("zaggy;{assert:always}")
diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp
index 3aff742c62..664f0cf877 100644
--- a/qpid/cpp/src/tests/qpid-perftest.cpp
+++ b/qpid/cpp/src/tests/qpid-perftest.cpp
@@ -173,7 +173,7 @@ struct Opts : public TestOptions {
if (count % subs) {
count += subs - (count % subs);
cout << "WARNING: Adjusted --count to " << count
- << " the nearest multiple of --nsubs" << endl;
+ << " the next multiple of --nsubs" << endl;
}
totalPubs = pubs*qt;
totalSubs = subs*qt;
@@ -413,7 +413,7 @@ struct Controller : public Client {
AbsTime start=now();
send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers
if (j) {
- send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
+ send(opts.totalSubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
}
Stats pubRates;
@@ -546,9 +546,9 @@ struct PublishThread : public Client {
if (opts.confirm) session.sync();
AbsTime end=now();
double time=secs(start,end);
- if (time <= 0.0) {
- throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count.");
- }
+ if (time <= 0.0) {
+ throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count.");
+ }
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
@@ -644,7 +644,9 @@ struct SubscribeThread : public Client {
//
// For now verify order only for a single publisher.
size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0;
- size_t n = *reinterpret_cast<const size_t*>(msg.getData().data() + offset);
+ size_t n;
+ memcpy (&n, reinterpret_cast<const char*>(msg.getData().data() + offset),
+ sizeof(n));
if (opts.pubs == 1) {
if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n);
else verify(n>=expect, ">=", expect, n);
diff --git a/qpid/cpp/src/tests/sasl.mk b/qpid/cpp/src/tests/sasl.mk
index 20eaa7c7a5..69b24c3f8a 100644
--- a/qpid/cpp/src/tests/sasl.mk
+++ b/qpid/cpp/src/tests/sasl.mk
@@ -30,7 +30,7 @@ check_PROGRAMS+=sasl_version
sasl_version_SOURCES=sasl_version.cpp
sasl_version_LDADD=$(lib_client)
-TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster
+TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster sasl_no_dir
LONG_TESTS += run_cluster_authentication_soak
EXTRA_DIST += run_cluster_authentication_test \
sasl_fed \
@@ -43,7 +43,8 @@ EXTRA_DIST += run_cluster_authentication_test \
sasl_fed_ex_dynamic_cluster \
sasl_fed_ex_link_cluster \
sasl_fed_ex_queue_cluster \
- sasl_fed_ex_route_cluster
+ sasl_fed_ex_route_cluster \
+ sasl_no_dir
endif # HAVE_SASL
diff --git a/qpid/cpp/src/tests/sasl_no_dir b/qpid/cpp/src/tests/sasl_no_dir
new file mode 100755
index 0000000000..15a36014bb
--- /dev/null
+++ b/qpid/cpp/src/tests/sasl_no_dir
@@ -0,0 +1,218 @@
+#! /bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+source ./test_env.sh
+
+script_name=`basename $0`
+
+# This minimum value corresponds to sasl version 2.1.22
+minimum_sasl_version=131350
+
+sasl_version=`$QPID_TEST_EXEC_DIR/sasl_version`
+
+# This test is necessary because this sasl version is the first one that permits
+# redirection of the sasl config file path.
+if [ "$sasl_version" -lt "$minimum_sasl_version" ]; then
+ echo "sasl_fed: must have sasl version 2.1.22 or greater. ( Integer value: $minimum_sasl_version ) Version is: $sasl_version"
+ exit 0
+fi
+
+
+sasl_config_dir=$builddir/sasl_config
+
+
+# Debugging print. --------------------------
+debug=
+function print {
+ if [ "$debug" ]; then
+ echo "${script_name}: $1"
+ fi
+}
+
+
+my_random_number=$RANDOM
+tmp_root=/tmp/sasl_fed_$my_random_number
+mkdir -p $tmp_root
+
+
+LOG_FILE=$tmp_root/qpidd.log
+
+# If you want to see this test fail, just comment out this 'mv' command.
+print "Moving sasl configuration dir."
+mv ${sasl_config_dir} ${sasl_config_dir}-
+
+
+#--------------------------------------------------
+print " Starting broker"
+#--------------------------------------------------
+$QPIDD_EXEC \
+ -p 0 \
+ --no-data-dir \
+ --auth=yes \
+ --mgmt-enable=yes \
+ --log-enable info+ \
+ --log-source yes \
+ --log-to-file ${LOG_FILE} \
+ --sasl-config=$sasl_config_dir \
+ -d 2> /dev/null 1> $tmp_root/broker_port
+
+
+
+# If it works right, the output will look something like this: ( two lines long )
+# Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112)
+# 2011-10-13 14:07:00 critical qpidd.cpp:83: Unexpected error: Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112)
+
+result=`cat ${LOG_FILE} | grep "sasl_set_path failed: no such directory" | wc -l `
+
+#--------------------------------------------------
+print "Restore the Sasl config dir to its original place."
+#--------------------------------------------------
+mv ${sasl_config_dir}- ${sasl_config_dir}
+
+if [ "2" -eq ${result} ]; then
+ print "result: success"
+ rm -rf $tmp_root
+ exit 0
+fi
+
+
+# If this test fails, the broker is still alive.
+# Kill it.
+broker_port=`cat $tmp_root/broker_port`
+#--------------------------------------------------
+print "Asking broker to quit."
+#--------------------------------------------------
+$QPIDD_EXEC --port $broker_port --quit
+
+rm -rf $tmp_root
+
+print "result: fail"
+exit 1
+
+#! /bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+source ./test_env.sh
+
+script_name=`basename $0`
+
+# This minimum value corresponds to sasl version 2.1.22
+minimum_sasl_version=131350
+
+sasl_version=`$QPID_TEST_EXEC_DIR/sasl_version`
+
+# This test is necessary because this sasl version is the first one that permits
+# redirection of the sasl config file path.
+if [ "$sasl_version" -lt "$minimum_sasl_version" ]; then
+ echo "sasl_fed: must have sasl version 2.1.22 or greater. ( Integer value: $minimum_sasl_version ) Version is: $sasl_version"
+ exit 0
+fi
+
+
+sasl_config_dir=$builddir/sasl_config
+
+
+# Debugging print. --------------------------
+debug=
+function print {
+ if [ "$debug" ]; then
+ echo "${script_name}: $1"
+ fi
+}
+
+
+my_random_number=$RANDOM
+tmp_root=/tmp/sasl_fed_$my_random_number
+mkdir -p $tmp_root
+
+
+LOG_FILE=$tmp_root/qpidd.log
+
+# If you want to see this test fail, just comment out this 'mv' command.
+print "Moving sasl configuration dir."
+mv ${sasl_config_dir} ${sasl_config_dir}-
+
+
+#--------------------------------------------------
+print " Starting broker"
+#--------------------------------------------------
+$QPIDD_EXEC \
+ -p 0 \
+ --no-data-dir \
+ --auth=yes \
+ --mgmt-enable=yes \
+ --log-enable info+ \
+ --log-source yes \
+ --log-to-file ${LOG_FILE} \
+ --sasl-config=$sasl_config_dir \
+ -d 2> /dev/null 1> $tmp_root/broker_port
+
+
+
+# If it works right, the output will look something like this: ( two lines long )
+# Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112)
+# 2011-10-13 14:07:00 critical qpidd.cpp:83: Unexpected error: Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112)
+
+result=`cat ${LOG_FILE} | grep "sasl_set_path failed: no such directory" | wc -l `
+
+#--------------------------------------------------
+print "Restore the Sasl config dir to its original place."
+#--------------------------------------------------
+mv ${sasl_config_dir}- ${sasl_config_dir}
+
+if [ "2" -eq ${result} ]; then
+ print "result: success"
+ rm -rf $tmp_root
+ exit 0
+fi
+
+
+# If this test fails, the broker is still alive.
+# Kill it.
+broker_port=`cat $tmp_root/broker_port`
+#--------------------------------------------------
+print "Asking broker to quit."
+#--------------------------------------------------
+$QPIDD_EXEC --port $broker_port --quit
+
+rm -rf $tmp_root
+
+print "result: fail"
+exit 1
+
diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test
index cbf75eb237..6c056f4288 100755
--- a/qpid/cpp/src/tests/ssl_test
+++ b/qpid/cpp/src/tests/ssl_test
@@ -47,9 +47,13 @@ delete_certs() {
fi
}
-COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME"
start_broker() { # $1 = extra opts
- ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS $1;
+ ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --auth no $1;
+}
+
+start_authenticating_broker() {
+ ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes;
}
stop_brokers() {
@@ -64,6 +68,13 @@ cleanup() {
delete_certs
}
+pick_port() {
+ # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+ PICK=`../qpidd --no-module-dir -dp0`
+ ../qpidd --no-module-dir -qp $PICK
+ echo $PICK
+}
+
CERTUTIL=$(type -p certutil)
if [[ !(-x $CERTUTIL) ]] ; then
echo "No certutil, skipping ssl test";
@@ -93,7 +104,7 @@ test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; }
#### Client Authentication tests
-PORT2=`start_broker --ssl-require-client-authentication` || error "Could not start broker"
+PORT2=`start_authenticating_broker` || error "Could not start broker"
echo "Running SSL client authentication test on port $PORT2"
URL=amqp:ssl:$TEST_HOSTNAME:$PORT2
@@ -109,19 +120,22 @@ test "$MSG3" = "" || { echo "receive succeeded without valid ssl cert '$MSG3' !=
stop_brokers
+#Test multiplexed connection where SSL and plain TCP are served by the same port
+PORT=`pick_port`; ../qpidd --port $PORT --ssl-port $PORT $COMMON_OPTS --transport ssl --auth no
+echo "Running multiplexed SSL/TCP test on $PORT"
+
+./qpid-perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary || { echo "SSL on multiplexed connection failed!"; exit 1; }
+./qpid-perftest --count ${COUNT} --port ${PORT} -P tcp -b $TEST_HOSTNAME --summary || { echo "Plain TCP on multiplexed connection failed!"; exit 1; }
+
+stop_brokers
+
test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported.
## Test failover in a cluster using SSL only
. $srcdir/ais_check # Will exit if clustering not enabled.
-pick_port() {
- # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
- PICK=`../qpidd --no-module-dir -dp0`
- ../qpidd --no-module-dir -qp $PICK
- echo $PICK
-}
ssl_cluster_broker() { # $1 = port
- ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+ ../qpidd $COMMON_OPTS --require-encryption --auth no --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
# Wait for broker to be ready
qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
echo "Running SSL cluster broker on port $1"
diff --git a/qpid/cpp/src/windows/QpiddBroker.cpp b/qpid/cpp/src/windows/QpiddBroker.cpp
index 50bb45979c..42ba97bdb1 100644
--- a/qpid/cpp/src/windows/QpiddBroker.cpp
+++ b/qpid/cpp/src/windows/QpiddBroker.cpp
@@ -19,17 +19,9 @@
*
*/
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#else
-// These need to be made something sensible, like reading a value from
-// the registry. But for now, get things going with a local definition.
-namespace {
-const char *QPIDD_CONF_FILE = "qpid_broker.conf";
-const char *QPIDD_MODULE_DIR = ".";
-}
-#endif
+#include "config.h"
#include "qpidd.h"
+#include "SCM.h"
#include "qpid/Exception.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
@@ -205,8 +197,56 @@ struct BrokerInfo {
DWORD pid;
};
+// Service-related items. Only involved when running the broker as a Windows
+// service.
+
+const std::string svcName = "qpidd";
+SERVICE_STATUS svcStatus;
+SERVICE_STATUS_HANDLE svcStatusHandle = 0;
+
+// This function is only called when the broker is run as a Windows
+// service. It receives control requests from Windows.
+VOID WINAPI SvcCtrlHandler(DWORD control)
+{
+ switch(control) {
+ case SERVICE_CONTROL_STOP:
+ svcStatus.dwCurrentState = SERVICE_STOP_PENDING;
+ svcStatus.dwControlsAccepted = 0;
+ svcStatus.dwCheckPoint = 1;
+ svcStatus.dwWaitHint = 5000; // 5 secs.
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ CtrlHandler(CTRL_C_EVENT);
+ break;
+
+ case SERVICE_CONTROL_INTERROGATE:
+ break;
+
+ default:
+ break;
+ }
+}
+
+VOID WINAPI ServiceMain(DWORD argc, LPTSTR *argv)
+{
+ ::memset(&svcStatus, 0, sizeof(svcStatus));
+ svcStatusHandle = ::RegisterServiceCtrlHandler(svcName.c_str(),
+ SvcCtrlHandler);
+ svcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+ svcStatus.dwCheckPoint = 1;
+ svcStatus.dwWaitHint = 10000; // 10 secs.
+ svcStatus.dwCurrentState = SERVICE_START_PENDING;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ // QpiddBroker class resets state to running.
+ svcStatus.dwWin32ExitCode = run_broker(argc, argv, true);
+ svcStatus.dwCurrentState = SERVICE_STOPPED;
+ svcStatus.dwCheckPoint = 0;
+ svcStatus.dwWaitHint = 0;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
}
+} // namespace
+
+
struct ProcessControlOptions : public qpid::Options {
bool quit;
bool check;
@@ -225,9 +265,49 @@ struct ProcessControlOptions : public qpid::Options {
}
};
+struct ServiceOptions : public qpid::Options {
+ bool install;
+ bool start;
+ bool stop;
+ bool uninstall;
+ bool daemon;
+ std::string startType;
+ std::string startArgs;
+ std::string account;
+ std::string password;
+ std::string depends;
+
+ ServiceOptions()
+ : qpid::Options("Service options"),
+ install(false),
+ start(false),
+ stop(false),
+ uninstall(false),
+ daemon(false),
+ startType("demand"),
+ startArgs(""),
+ account("NT AUTHORITY\\LocalService"),
+ password(""),
+ depends("")
+ {
+ addOptions()
+ ("install", qpid::optValue(install), "Install as service")
+ ("start-type", qpid::optValue(startType, "auto|demand|disabled"), "Service start type\nApplied at install time only.")
+ ("arguments", qpid::optValue(startArgs, "COMMAND LINE ARGS"), "Arguments to pass when service auto-starts")
+ ("account", qpid::optValue(account, "(LocalService)"), "Account to run as, default is LocalService\nApplied at install time only.")
+ ("password", qpid::optValue(password, "PASSWORD"), "Account password, if needed\nApplied at install time only.")
+ ("depends", qpid::optValue(depends, "(comma delimited list)"), "Names of services that must start before this service\nApplied at install time only.")
+ ("start", qpid::optValue(start), "Start the service.")
+ ("stop", qpid::optValue(stop), "Stop the service.")
+ ("uninstall", qpid::optValue(uninstall), "Uninstall the service.");
+ }
+};
+
struct QpiddWindowsOptions : public QpiddOptionsPrivate {
ProcessControlOptions control;
+ ServiceOptions service;
QpiddWindowsOptions(QpiddOptions *parent) : QpiddOptionsPrivate(parent) {
+ parent->add(service);
parent->add(control);
}
};
@@ -253,12 +333,63 @@ void QpiddOptions::usage() const {
}
int QpiddBroker::execute (QpiddOptions *options) {
+
+ // If running as a service, bump the status checkpoint to let SCM know
+ // we're still making progress.
+ if (svcStatusHandle != 0) {
+ svcStatus.dwCheckPoint++;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ }
+
// Options that affect a running daemon.
QpiddWindowsOptions *myOptions =
- reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
+ reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
if (myOptions == 0)
throw qpid::Exception("Internal error obtaining platform options");
+ if (myOptions->service.install) {
+ // Handle start type
+ DWORD startType;
+ if (myOptions->service.startType.compare("demand") == 0)
+ startType = SERVICE_DEMAND_START;
+ else if (myOptions->service.startType.compare("auto") == 0)
+ startType = SERVICE_AUTO_START;
+ else if (myOptions->service.startType.compare("disabled") == 0)
+ startType = SERVICE_DISABLED;
+ else if (!myOptions->service.startType.empty())
+ throw qpid::Exception("Invalid service start type: " +
+ myOptions->service.startType);
+
+ // Install service and exit
+ qpid::windows::SCM manager;
+ manager.install(svcName,
+ "Apache Qpid Message Broker",
+ myOptions->service.startArgs,
+ startType,
+ myOptions->service.account,
+ myOptions->service.password,
+ myOptions->service.depends);
+ return 0;
+ }
+
+ if (myOptions->service.start) {
+ qpid::windows::SCM manager;
+ manager.start(svcName);
+ return 0;
+ }
+
+ if (myOptions->service.stop) {
+ qpid::windows::SCM manager;
+ manager.stop(svcName);
+ return 0;
+ }
+
+ if (myOptions->service.uninstall) {
+ qpid::windows::SCM manager;
+ manager.uninstall(svcName);
+ return 0;
+ }
+
if (myOptions->control.check || myOptions->control.quit) {
// Relies on port number being set via --port or QPID_PORT env variable.
NamedSharedMemory<BrokerInfo> info(brokerInfoName(options->broker.port));
@@ -301,10 +432,41 @@ int QpiddBroker::execute (QpiddOptions *options) {
::SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE);
brokerPtr->accept();
std::cout << options->broker.port << std::endl;
+
+ // If running as a service, tell SCM we're up. There's still a chance
+ // that store recovery will drag out the time before the broker actually
+ // responds to requests, but integrating that mechanism with the SCM
+ // updating is probably more work than it's worth.
+ if (svcStatusHandle != 0) {
+ svcStatus.dwCheckPoint = 0;
+ svcStatus.dwWaitHint = 0;
+ svcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+ svcStatus.dwCurrentState = SERVICE_RUNNING;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ }
+
brokerPtr->run();
waitShut.signal(); // In case we shut down some other way
waitThr.join();
+ return 0;
+}
+
- // CloseHandle(h);
+int main(int argc, char* argv[])
+{
+ // If started as a service, notify the SCM we're up. Else just run.
+ // If as a service, StartServiceControlDispatcher doesn't return until
+ // the service is stopped.
+ SERVICE_TABLE_ENTRY dispatchTable[] =
+ {
+ { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain },
+ { NULL, NULL }
+ };
+ if (!StartServiceCtrlDispatcher(dispatchTable)) {
+ DWORD err = ::GetLastError();
+ if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console
+ return run_broker(argc, argv);
+ throw QPID_WINDOWS_ERROR(err);
+ }
return 0;
}
diff --git a/qpid/cpp/src/windows/SCM.cpp b/qpid/cpp/src/windows/SCM.cpp
new file mode 100644
index 0000000000..232bb04c17
--- /dev/null
+++ b/qpid/cpp/src/windows/SCM.cpp
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/sys/windows/check.h"
+#include "SCM.h"
+
+#pragma comment(lib, "advapi32.lib")
+
+namespace {
+
+// Container that will close a SC_HANDLE upon destruction.
+class AutoServiceHandle {
+public:
+ AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}
+ ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }
+ void release() { h = NULL; }
+ void reset(SC_HANDLE newHandle)
+ {
+ if (h != NULL)
+ ::CloseServiceHandle(h);
+ h = newHandle;
+ }
+ operator SC_HANDLE() const { return h; }
+
+private:
+ SC_HANDLE h;
+};
+
+}
+
+namespace qpid {
+namespace windows {
+
+SCM::SCM() : scmHandle(NULL)
+{
+}
+
+SCM::~SCM()
+{
+ if (NULL != scmHandle)
+ ::CloseServiceHandle(scmHandle);
+}
+
+/**
+ * Install this executable as a service
+ */
+void SCM::install(const string& serviceName,
+ const string& serviceDesc,
+ const string& args,
+ DWORD startType,
+ const string& account,
+ const string& password,
+ const string& depends)
+{
+ // Handle dependent service name list; Windows wants a set of nul-separated
+ // names ending with a double nul.
+ string depends2 = depends;
+ if (!depends2.empty()) {
+ // CDL to null delimiter w/ trailing double null
+ size_t p = 0;
+ while ((p = depends2.find_first_of( ',', p)) != string::npos)
+ depends2.replace(p, 1, 1, '\0');
+ depends2.push_back('\0');
+ depends2.push_back('\0');
+ }
+
+#if 0
+ // I'm nervous about adding a user/password check here. Is this a
+ // potential attack vector, letting users check passwords without
+ // control? -Steve Huston, Feb 24, 2011
+
+ // Validate account, password
+ HANDLE hToken = NULL;
+ bool logStatus = false;
+ if (!account.empty() && !password.empty() &&
+ !(logStatus = ::LogonUserA(account.c_str(),
+ "",
+ password.c_str(),
+ LOGON32_LOGON_NETWORK,
+ LOGON32_PROVIDER_DEFAULT,
+ &hToken ) != 0))
+ std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;
+ if (logStatus)
+ ::CloseHandle(hToken);
+#endif
+
+ // Get fully qualified .exe name
+ char myPath[MAX_PATH];
+ DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);
+ QPID_WINDOWS_CHECK_NOT(myPathLength, 0);
+ string imagePath(myPath, myPathLength);
+ if (!args.empty())
+ imagePath += " " + args;
+
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+
+ // Create the service
+ SC_HANDLE svcHandle;
+ svcHandle = ::CreateService(scmHandle, // SCM database
+ serviceName.c_str(), // name of service
+ serviceDesc.c_str(), // name to display
+ SERVICE_ALL_ACCESS, // desired access
+ SERVICE_WIN32_OWN_PROCESS, // service type
+ startType, // start type
+ SERVICE_ERROR_NORMAL, // error cntrl type
+ imagePath.c_str(), // path to service's binary w/ optional arguments
+ NULL, // no load ordering group
+ NULL, // no tag identifier
+ depends2.empty() ? NULL : depends2.c_str(),
+ account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem
+ password.empty() ? NULL : password.c_str()); // password, or NULL for none
+ QPID_WINDOWS_CHECK_NULL(svcHandle);
+ ::CloseServiceHandle(svcHandle);
+ QPID_LOG(info, "Service installed successfully");
+}
+
+/**
+ *
+ */
+void SCM::uninstall(const string& serviceName)
+{
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ DELETE));
+ QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);
+ QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);
+ QPID_LOG(info, "Service deleted successfully.");
+}
+
+/**
+ * Attempt to start the service.
+ */
+void SCM::start(const string& serviceName)
+{
+ // Ensure we have a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_ALL_ACCESS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Check the status in case the service is not stopped.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOP_PENDING)
+ throw qpid::Exception("Timed out waiting for running service to stop.");
+
+ // Attempt to start the service.
+ QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);
+
+ QPID_LOG(info, "Service start pending...");
+
+ // Check the status until the service is no longer start pending.
+ state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);
+ // Determine whether the service is running.
+ if (state == SERVICE_RUNNING) {
+ QPID_LOG(info, "Service started successfully");
+ }
+ else {
+ throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));
+ }
+}
+
+/**
+ *
+ */
+void SCM::stop(const string& serviceName)
+{
+ // Ensure a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_STOP | SERVICE_QUERY_STATUS |
+ SERVICE_ENUMERATE_DEPENDENTS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Make sure the service is not already stopped; if it's stop-pending,
+ // wait for it to finalize.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED) {
+ QPID_LOG(info, "Service is already stopped");
+ return;
+ }
+
+ // If the service is running, dependencies must be stopped first.
+ std::auto_ptr<ENUM_SERVICE_STATUS> deps;
+ DWORD numDeps = getDependentServices(svc, deps);
+ for (DWORD i = 0; i < numDeps; i++)
+ stop(deps.get()[i].lpServiceName);
+
+ // Dependents stopped; send a stop code to the service.
+ SERVICE_STATUS_PROCESS ssp;
+ if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))
+ throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<
+ qpid::sys::strError(::GetLastError())));
+
+ // Wait for the service to stop.
+ state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED)
+ QPID_LOG(info, QPID_MSG("Service " << serviceName <<
+ " stopped successfully."));
+}
+
+/**
+ *
+ */
+void SCM::openSvcManager()
+{
+ if (NULL != scmHandle)
+ return;
+
+ scmHandle = ::OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // Rights
+ QPID_WINDOWS_CHECK_NULL(scmHandle);
+}
+
+DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)
+{
+ SERVICE_STATUS_PROCESS ssStatus;
+ DWORD bytesNeeded;
+ DWORD waitTime;
+ if (!::QueryServiceStatusEx(svc, // handle to service
+ SC_STATUS_PROCESS_INFO, // information level
+ (LPBYTE)&ssStatus, // address of structure
+ sizeof(ssStatus), // size of structure
+ &bytesNeeded)) // size needed if buffer is too small
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ // Save the tick count and initial checkpoint.
+ DWORD startTickCount = ::GetTickCount();
+ DWORD oldCheckPoint = ssStatus.dwCheckPoint;
+
+ // Wait for the service to change out of the noted state.
+ while (ssStatus.dwCurrentState == originalState) {
+ // Do not wait longer than the wait hint. A good interval is
+ // one-tenth of the wait hint but not less than 1 second
+ // and not more than 10 seconds.
+ waitTime = ssStatus.dwWaitHint / 10;
+ if (waitTime < 1000)
+ waitTime = 1000;
+ else if (waitTime > 10000)
+ waitTime = 10000;
+
+ ::Sleep(waitTime);
+
+ // Check the status until the service is no longer stop pending.
+ if (!::QueryServiceStatusEx(svc,
+ SC_STATUS_PROCESS_INFO,
+ (LPBYTE) &ssStatus,
+ sizeof(ssStatus),
+ &bytesNeeded))
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ if (ssStatus.dwCheckPoint > oldCheckPoint) {
+ // Continue to wait and check.
+ startTickCount = ::GetTickCount();
+ oldCheckPoint = ssStatus.dwCheckPoint;
+ } else {
+ if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)
+ break;
+ }
+ }
+ return ssStatus.dwCurrentState;
+}
+
+/**
+ * Get the services that depend on @arg svc. All dependent service info
+ * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.
+ *
+ * @retval The number of dependent services.
+ */
+DWORD SCM::getDependentServices(SC_HANDLE svc,
+ std::auto_ptr<ENUM_SERVICE_STATUS>& deps)
+{
+ DWORD bytesNeeded;
+ DWORD numEntries;
+
+ // Pass a zero-length buffer to get the required buffer size.
+ if (::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ 0,
+ 0,
+ &bytesNeeded,
+ &numEntries)) {
+ // If the Enum call succeeds, then there are no dependent
+ // services, so do nothing.
+ return 0;
+ }
+
+ if (::GetLastError() != ERROR_MORE_DATA)
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+
+ // Allocate a buffer for the dependencies.
+ deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));
+ // Enumerate the dependencies.
+ if (!::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ deps.get(),
+ bytesNeeded,
+ &bytesNeeded,
+ &numEntries))
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+ return numEntries;
+}
+
+} } // namespace qpid::windows
diff --git a/qpid/cpp/src/windows/SCM.h b/qpid/cpp/src/windows/SCM.h
new file mode 100644
index 0000000000..bdc73bc210
--- /dev/null
+++ b/qpid/cpp/src/windows/SCM.h
@@ -0,0 +1,109 @@
+#ifndef WINDOWS_SCM_H
+#define WINDOWS_SCM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <string>
+using std::string;
+
+#ifdef UNICODE
+#undef UNICODE
+#endif
+
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+
+namespace qpid {
+namespace windows {
+
+/**
+ * @class SCM
+ *
+ * Access the Windows Service Control Manager.
+ */
+class SCM
+{
+public:
+ SCM();
+ ~SCM();
+
+ /**
+ * Install this executable as a service
+ *
+ * @param serviceName The name of the service
+ * @param serviceDesc Description of the service's purpose
+ * @param args The argument list to pass into the service
+ * @param startType The start type: SERVICE_DEMAND_START,
+ * SERVICE_AUTO_START, SERVICE_DISABLED
+ * @param account If not empty, the account name to install this
+ * service under
+ * @param password If not empty, the account password to install this
+ * service with
+ * @param depends If not empty, a comma delimited list of services
+ * that must start before this one
+ */
+ void install(const string& serviceName,
+ const string& serviceDesc,
+ const string& args,
+ DWORD startType = SERVICE_DEMAND_START,
+ const string& account = "NT AUTHORITY\\LocalSystem",
+ const string& password = "",
+ const string& depends = "");
+
+ /**
+ * Uninstall this executable as a service
+ *
+ * @param serviceName the name of the service
+ */
+ void uninstall(const string& serviceName);
+
+ /**
+ * Start the specified service
+ *
+ * @param serviceName the name of the service
+ */
+ void start(const string& serviceName);
+
+ /**
+ * Stop the specified service
+ *
+ * @param serviceName the name of the service
+ */
+ void stop(const string &serviceName);
+
+private:
+ SC_HANDLE scmHandle;
+
+ void openSvcManager();
+ DWORD waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState);
+ DWORD getDependentServices(SC_HANDLE svc,
+ std::auto_ptr<ENUM_SERVICE_STATUS>& deps);
+
+};
+
+}} // namespace qpid::windows
+
+#endif /* #ifndef WINDOWS_SCM_H */