diff options
Diffstat (limited to 'qpid/cpp')
57 files changed, 1962 insertions, 965 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-design.txt b/qpid/cpp/design_docs/new-cluster-design.txt index 7adb46fee3..936530a39a 100644 --- a/qpid/cpp/design_docs/new-cluster-design.txt +++ b/qpid/cpp/design_docs/new-cluster-design.txt @@ -17,7 +17,6 @@ # under the License. * A new design for Qpid clustering. - ** Issues with current design. The cluster is based on virtual synchrony: each broker multicasts @@ -84,19 +83,21 @@ context. ** A new cluster design. -Clearly defined interface between broker code and cluster plug-in. +1. Clearly defined interface between broker code and cluster plug-in. -Replicate queue events rather than client data. -- Broker behavior only needs to match per-queue. -- Smaller amount of code (queue implementation) that must behave predictably. -- Events only need be serialized per-queue, allows concurrency between queues +2. Replicate queue events rather than client data. + - Only requires consistent enqueue order. + - Events only need be serialized per-queue, allows concurrency between queues + - Allows for replicated and non-replicated queues. -Use a moving queue ownership protocol to agree order of dequeues. -No longer relies on identical state and lock-step behavior to cause -identical dequeues on each broker. +3. Use a lock protocol to agree order of dequeues: only the broker + holding the lock can acqiure & dequeue. No longer relies on + identical state and lock-step behavior to cause identical dequeues + on each broker. -Each queue has an associated thread-context. Events for a queue are executed -in that queues context, in parallel with events for other queues. +4. Use multiple CPG groups to process different queues in + parallel. Use a fixed set of groups and hash queue names to choose + the group for each queue. *** Requirements @@ -149,7 +150,7 @@ a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept -### HERE + In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. @@ -162,19 +163,32 @@ On receiving an accept the broker: NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. +*** Multiple CPG groups. + +The old cluster was bottlenecked by processing everything in a single +CPG deliver thread. + +The new cluster uses a set of CPG groups, one per core. Queue names +are hashed to give group indexes, so statistically queues are likely +to be spread over the set of groups. + +Operations on a given queue always use the same group, so we have +order within each queue, but operations on different queues can use +different groups giving greater throughput sending to CPG and multiple +handler threads to process CPG messages. + ** Inconsistent errors. -The new design eliminates most sources of inconsistent errors -(connections, sessions, security, management etc.) The only points -where inconsistent errors can occur are at enqueue and dequeue (most -likely store-related errors.) +An inconsistent error means that after multicasting an enqueue, accept +or dequeue, some brokers succeed in processing it and others fail. -The new design can use the exisiting error-handling protocol with one -major improvement: since brokers are no longer required to maintain -identical state they do not have to stall processing while an error is -being resolved. +The new design eliminates most sources of inconsistent errors in the +old broker: connections, sessions, security, management etc. Only +store journal errors remain. -#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. +The new inconsistent error protocol is similar to the old one with one +major improvement: brokers do not have to stall processing while an +error is being resolved. ** Updating new members @@ -193,60 +207,44 @@ catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. -Update of wiring (exchanges, queues, bindings) is the same as current -design. - -Update of messages is different: -- per-queue rather than per-broker, separate queues can be updated in parallel. -- updates queues in reverse order to eliminate unbounded catch-up -- does not require updater & updatee to stall during update. +We update individual objects (queues and exchanges) independently. +- create queues first, then update all queues and exchanges in parallel. +- multiple updater threads, per queue/exchange. -Replication events, multicast to cluster: -- enqueue(q,m): message m pushed on back of queue q . -- acquire(q,m): mark m acquired -- dequeue(q,m): forget m. -Messages sent on update connection: -- update_front(q,m): during update, receiver pushes m to *front* of q -- update_done(q): during update, update of q is complete. +Queue updater: +- marks the queue position at the sync point +- sends messages starting from the sync point working towards the head of the queue. +- send "done" message. -Updater: -- when updatee joins set iterator i = q.end() -- while i != q.begin(): --i; send update_front(q,*i) to updatee -- send update_done(q) to updatee +Queue updatee: +- enqueues received from CPG: add to back of queue as normal. +- dequeues received from CPG: apply if found, else save to check at end of update. +- messages from updater: add to the *front* of the queue. +- update complete: apply any saved dequeues. -Updatee: -- q initially in locked state, can't dequeue locally. -- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) -- receive update_front(q,m): q.push_front(m) -- receive update_done(q): q can be unlocked for local dequeing. +Exchange updater: +- updater: send snapshot of exchange as it was at the sync point. -Benefits: -- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. -- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. -- During update consumers actually help by removing messages before they need to be updated. -- Needs no separate "work to do" queue, only the broker queues themselves. +Exchange updatee: +- queue exchange operations after the sync point. +- when snapshot is received: apply saved operations. -# TODO how can we recover from updater crashing before update complete? -# Clear queues that are not updated & send request for udpates on those queues? +Note: +- Updater is active throughout, no stalling. +- Consuming clients actually reduce the size of the update. +- Updatee stalls clients until the update completes. + (Note: May be possible to avoid updatee stall as well, needs thought) -# TODO updatee may receive a dequeue for a message it has not yet seen, needs -# to hold on to that so it can drop the message when it is seen. -# Similar problem exists for wiring? +** Internal cluster interface -** Cluster API - -The new cluster API is similar to the MessageStore interface. -(Initially I thought it would be an extension of the MessageStore interface, -but as the design develops it seems better to make it a separate interface.) +The new cluster interface is similar to the MessageStore interface, but +provides more detail (message positions) and some additional call +points (e.g. acquire) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. - -The cluster will require some extensions to the Queue: -- Queues can be "locked", locked queues are ignored by IO-driven output. -- Cluster must be able to apply queue events from the cluster to a queue. - These appear to fit into existing queue operations. +- transactional events. ** Maintainability @@ -273,106 +271,48 @@ A number of specific ways the code will be simplified: ** Performance -The only way to verify the relative performance of the new design is -to prototype & profile. The following points suggest the new design -may scale/perform better: - -Some work moved from virtual synchrony thread to connection threads: -- All connection/session logic moves to connection thread. -- Exchange routing logic moves to connection thread. -- On local broker dequeueing is done in connection thread -- Local broker dequeue is IO driven as for a standalone broker. - -For queues with all consumers on a single node dequeue is all -IO-driven in connection thread. Pay for time-sharing only if queue has -consumers on multiple brokers. - -Doing work for different queues in parallel scales on multi-core boxes when -there are multiple queues. - -One difference works against performance, thre is an extra -encode/decode. The old design multicasts raw client data and decodes -it in the virtual synchrony thread. The new design would decode -messages in the connection thread, re-encode them for multicast, and -decode (on non-local brokers) in the virtual synchrony thread. There -is extra work here, but only in the *connection* thread: on a -multi-core machine this happens in parallel for every connection, so -it probably is not a bottleneck. There may be scope to optimize -decode/re-encode by re-using some of the original encoded data, this -could also benefit the stand-alone broker. - -** Asynchronous queue replication - -The existing "asynchronous queue replication" feature maintains a -passive backup passive backup of queues on a remote broker over a TCP -connection. - -The new cluster replication protocol could be re-used to implement -asynchronous queue replication: its just a special case where the -active broker is always the queue owner and the enqueue/dequeue -messages are sent over a TCP connection rather than multicast. - -The new update update mechanism could also work with 'asynchronous -queue replication', allowing such replication (over a TCP connection -on a WAN say) to be initiated after the queue had already been created -and been in use (one of the key missing features). - -** Increasing Concurrency and load sharing - -The current cluster is bottlenecked by processing everything in the -CPG deliver thread. By removing the need for identical operation on -each broker, we open up the possiblility of greater concurrency. - -Handling multicast enqueue, acquire, accpet, release etc: concurrency -per queue. Operatons on different queues can be done in different -threads. - -The new design does not force each broker to do all the work in the -CPG thread so spreading load across cluster members should give some -scale-up. - -** Misc outstanding issues & notes - -Replicating wiring -- Need async completion of wiring commands? -- qpid.sequence_counter: need extra work to support in new design, do we care? - -Cluster+persistence: -- finish async completion: dequeue completion for store & cluster -- cluster restart from store: clean stores *not* identical, pick 1, all others update. -- need to generate cluster ids for messages recovered from store. - -Live updates: we don't need to stall brokers during an update! -- update on queue-by-queue basis. -- updatee locks queues during update, no dequeue. -- update in reverse: don't update messages dequeued during update. -- updatee adds update messages at front (as normal), replicated messages at back. -- updater starts from back, sends "update done" when it hits front of queue. - -Flow control: need to throttle multicasting -1. bound the number of outstanding multicasts. -2. ensure the entire cluster keeps up, no unbounded "lag" -The existing design uses read-credit to solve 1., and does not solve 2. -New design should stop reading on all connections while flow control -condition exists? - -Can federation also be unified, at least in configuration? - -Consider queues (and exchanges?) as having "reliability" attributes: -- persistent: is the message stored on disk. -- backed-up (to another broker): active/passive async replication. -- replicated (to a cluster): active/active multicast replication to cluster. -- federated: federation link to a queue/exchange on another broker. - -"Reliability" seems right for the first 3 but not for federation, is -there a better term? - -Clustering and scalability: new design may give us the flexibility to -address scalability as part of cluster design. Think about -relationship to federation and "fragmented queues" idea. - -* Design debates/descisions +The standalone broker processes _connections_ concurrently, so CPU +usage increases as you add more connections. + +The new cluster processes _queues_ concurrently, so CPU usage increases as you +add more queues. + +In both cases, CPU usage peaks when the number of "units of + concurrency" (connections or queues) goes above the number of cores. + +When all consumers on a queue are connected to the same broker the new +cluster uses the same messagea allocation threading/logic as a +standalone broker, with a little extra asynchronous book-keeping. + +If a queue has multiple consumers connected to multiple brokers, the +new cluster time-shares the queue which is less efficient than having +all consumers on a queue connected to the same broker. +** Flow control +New design does not queue up CPG delivered messages, they are +processed immediately in the CPG deliver thread. This means that CPG's +flow control is sufficient for qpid. + +** Live upgrades + +Live upgrades refers to the ability to upgrade a cluster while it is +running, with no downtime. Each brokers in the cluster is shut down, +and then re-started with a new version of the broker code. + +To achieve this +- Cluster protocl XML file has a new element <version number=N> attached + to each method. This is the version at which the method was added. +- New versions can only add methods, existing methods cannot be changed. +- The cluster handshake for new members includes the protocol version + at each member. +- The cluster's version is the lowest version among its members. +- A newer broker can join and older cluster. When it does, it must restrict + itself to speaking the older version protocol. +- When the cluster version increases (because the lowest version member has left) + the remaining members may move up to the new version. + + +* Design debates ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In @@ -385,7 +325,7 @@ An active/passive implementation allows some simplifications over active/active: - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: -- Exactly one broker hast to take over if primary fails. +- Exactly one broker has to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. @@ -393,43 +333,17 @@ Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. -- Some load sharing: reading from client + multicast only done on direct node. - -Active/active drawbacks: -- Co-ordinating message acquisition may impact performance (not tested) -- Code may be more complex that active/passive. +- As long as a client can connect to any broker in the cluster, it can be served. Active/passive benefits: -- Don't need message allocation strategy, can feed consumers at top speed. -- Code may be simpler than active/active. +- Don't need to replicate message allocation, can feed consumers at top speed. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. +- No service if a partition separates a client from the active broker, + even if the client can see other brokers. -** Total ordering. - -Initial thinking: allow message ordering to differ between brokers. -New thinking: use CPG total ordering, get identical ordering on all brokers. -- Allowing variation in order introduces too much chance of unexpected behavior. -- Usign total order allows other optimizations, see Message Identifiers below. - -** Message identifiers. - -Initial thinking: message ID = CPG node id + 64 bit sequence number. -This involves a lot of mapping between cluster IDs and broker messsages. - -New thinking: message ID = queue name + queue position. -- Removes most of the mapping and memory management for cluster code. -- Requires total ordering of messages (see above) - -** Message rejection - -Initial thinking: add special reject/rejected points to cluster interface so -rejected messages could be re-queued without multicast. -New thinking: treat re-queueing after reject as entirely new message. -- Simplifies cluster interface & implementation -- Not on the critical path. diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt index 781876e55a..626e443be7 100644 --- a/qpid/cpp/design_docs/new-cluster-plan.txt +++ b/qpid/cpp/design_docs/new-cluster-plan.txt @@ -17,376 +17,156 @@ # specific language governing permissions and limitations # under the License. +* Status of impementation -Notes on new cluster implementation. See also: new-cluster-design.txt +Meaning of priorities: +[#A] Essential for basic functioning. +[#B] Required for first release. +[#C] Can be addressed in a later release. -* Implementation plan. +The existig prototype is bare bones to do performance benchmarks: +- Implements publish and consumer locking protocol. +- Defered delivery and asynchronous completion of message. +- Optimize the case all consumers are on the same node. +- No new member updates, no failover updates, no transactions, no persistence etc. -Co-existence with old cluster code and tests: -- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. -- Double up tests with old version/new version as the new code develops. +Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ -Minimal POC for message delivery & perf test. -- no wiring replication, no updates, no failover, no persistence, no async completion. -- just implement publish and acquire/dequeue locking protocol. -- optimize the special case where all consumers are on the same node. -- measure performance: compare active-passive and active-active modes of use. +** Similarities to existing cluster. -Full implementation of transient cluster -- Update (based on existing update), async completion etc. -- Passing all existing transient cluster tests. +/Active-active/: the new cluster can be a drop-in replacement for the +old, existing tests & customer deployment configurations are still +valid. -Persistent cluster -- Make sure async completion works correctly. -- InitialStatus protoocl etc. to support persistent start-up (existing code) -- cluster restart from store: stores not identical. Load one, update the rest. - - assign cluster ID's to messages recovered from store, don't replicate. +/Virtual synchrony/: Uses corosync to co-ordinate activity of members. -Improved update protocol -- per-queue, less stalling, bounded catch-up. +/XML controls/: Uses XML to define the primitives multicast to the +cluster. -* Task list +** Differences with existing cluster. -** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. +/Report rather than predict consumption/: brokers explicitly tell each +other which messages have been acquired or dequeued. This removes the +major cause of bugs in the existing cluster. -NOTE: as implementation questions arise, take the easiest option and make -a note for later optimization/improvement. +/Queue consumer locking/: to avoid duplicates only one broker can acquire or +dequeue messages at a time - while has the consume-lock on the +queue. If multiple brokers are consuming from the same queue the lock +is passed around to time-share access to the queue. -*** Tests -- python test: 4 senders, numbered messages, 4 receivers, verify message set. -- acquire then release messages: verify can be dequeued on any member -- acquire then kill broker: verify can be dequeued other members. -- acquire then reject: verify goes on alt-exchange once only. +/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting +the concurrency of the host) to allow concurrent processing on +different queues. Queues are hashed onto the groups. -*** DONE broker::Cluster interface and call points. +* Completed tasks +** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. + CLOSED: [2011-10-05 Wed 16:03] -Initial interface commited. +Defines broker::Cluster interface and call points. +Initial interface commite -*** Main classes +Main classes +Core: central object holding cluster classes together (replaces cluster::Cluster) +BrokerContext: implements broker::Cluster interface. +QueueContext: Attached to a broker::Queue, holds cluster status. +MessageHolder:holds local messages while they are being enqueued. -BrokerHandler: -- implements broker::Cluster intercept points. -- sends mcast events to inform cluster of local actions. -- thread safe, called in connection threads. +Implements multiple CPG groups for better concurrency. -LocalMessageMap: -- Holds local messages while they are being enqueued. -- thread safe: called by both BrokerHandler and MessageHandler - -MessageHandler: -- handles delivered mcast messages related to messages. -- initiates local actions in response to mcast events. -- thread unsafe, only called in deliver thread. -- maintains view of cluster state regarding messages. +** DONE [#A] Large message replication. + CLOSED: [2011-10-05 Wed 17:22] +Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) -QueueOwnerHandler: -- handles delivered mcast messages related to queue consumer ownership. -- thread safe, called in deliver, connection and timer threads. -- maintains view of cluster state regarding queue ownership. - -cluster::Core: class to hold new cluster together (replaces cluster::Cluster) -- thread safe: manage state used by both MessageHandler and BrokerHandler - -The following code sketch illustrates only the "happy path" error handling -is omitted. - -*** BrokerHandler -Types: -- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } -- struct - -NOTE: -- Messages on queues are identified by a queue name + a position. -- Messages being routed are identified by a sequence number. - -Members: -- thread_local bool noReplicate // suppress replication. -- thread_local bool isRouting // suppress operations while routing -- Message localMessage[SequenceNumber] // local messages being routed. -- thread_local SequenceNumber routingSequence - -NOTE: localMessage is also modified by MessageHandler. - -broker::Cluster intercept functions: - -routing(msg) - if noReplicate: return - # Supress everything except enqueues while we are routing. - # We don't want to replicate acquires & dequeues caused by an enqueu, - # e.g. removal of messages from ring/LV queues. - isRouting = true - -enqueue(qmsg): - if noReplicate: return - if routingSequence == 0 # thread local - routingSequence = nextRoutingSequence() - mcast create(encode(qmsg.msg),routingSeq) - mcast enqueue(qmsg.q,routingSeq) - -routed(msg): - if noReplicate: return - isRouting = false - -acquire(qmsg): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast acquire(qmsg) - -release(QueuedMessage) - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast release(qmsg) - -accept(QueuedMessage): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast accept(qmsg) - -reject(QueuedMessage): - isRejecting = true - mcast reject(qmsg) - -# FIXME no longer needed? -drop(QueuedMessage) - cleanup(qmsg) - -*** MessageHandler and mcast messages -Types: -- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } -- struct QueueKey { MessageId id; QueueName q; } -- typedef map<QueueKey, QueueEntry> Queue -- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } - -Members: -- QueueEntry enqueued[QueueKey] -- Node node[NodeId] - -Mcast messages in Message class: - -create(msg,seq) - if sender != self: node[sender].routing[seq] = decode(msg) - -enqueue(q,seq): - id = (sender,seq) - if sender == self: - enqueued[id,q] = (localMessage[seq], acquired=None) - else: - msg = sender.routing[seq] - enqueued[id,q] = (qmsg, acquired=None) - with noReplicate=true: qmsg = broker.getQueue(q).push(msg) - -routed(seq): - if sender == self: localMessage.erase(msg.id.seq) - else: sender.routing.erase(seq) - -acquire(id,q): - enqueued[id,q].acquired = sender - node[sender].acquired.push_back((id,q)) - if sender != self: - with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) - -release(id,q) - enqueued[id,q].acquired = None - node[sender].acquired.erase((id,q)) - if sender != self - with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) - -reject(id,q): - sender.routing[id] = enqueued[id,q] # prepare for re-queueing - -rejected(id,q) - sender.routing.erase[id] - -dequeue(id,q) - entry = enqueued[id,q] - enqueued.erase[id,q] - node[entry.acquired].acquired.erase(id,q) - if sender != self: - with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) - -member m leaves cluster: - for key in node[m].acquired: - release(key.id, key.q) - node.erase(m) - -*** Queue consumer locking - -When a queue is locked it does not deliver messages to its consumers. - -New broker::Queue functions: -- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. -- startConsumers(): reset consumersStopped flag - -Implementation sketch, locking omitted: - -void Queue::stopConsumers() { - consumersStopped = true; - while (consumersBusy) consumersBusyMonitor.wait(); -} - -void Queue::startConsumers() { - consumersStopped = false; - listeners.notify(); -} - -bool Queue::dispatch(consumer) { - if (consumersStopped) return false; - ++consumersBusy; - do_regular_dispatch_body() - if (--consumersBusy == 0) consumersBusyMonitor.notify(); -} - -*** QueueOwnerHandler - -Invariants: -- Each queue is owned by at most one node at any time. -- Each node is interested in a set of queues at any given time. -- A queue is un-owned if no node is interested. - -The queue owner releases the queue when -- it loses interest i.e. queue has no consumers with credit. -- a configured time delay expires and there are other interested nodes. - -The owner mcasts release(q). On delivery the new queue owner is the -next node in node-id order (treating nodes as a circular list) -starting from the old owner that is interested in the queue. - -Queue consumers initially are stopped, only started when we get -ownership from the cluster. - -Thread safety: called by deliver, connection and timer threads, needs locking. - -Thread safe object per queue holding queue ownership status. -Called by deliver, connection and timer threads. - -class QueueOwnership { - bool owned; - Timer timer; - BrokerQueue q; - - drop(): # locked - if owned: - owned = false - q.stopConsumers() - mcast release(q.name, false) - timer.stop() - - take(): # locked - if not owned: - owned = true - q.startConsumers() - timer.start(timeout) - - timer.fire(): drop() -} - -Data Members, only modified/examined in deliver thread: -- typedef set<NodeId> ConsumerSet -- map<QueueName, ConsumerSet> consumers -- map<QueueName, NodeId> owner +* Open questions -Thread safe data members, accessed in connection threads (via BrokerHandler): -- map<QueueName, QueueOwnership> ownership +** TODO [#A] Queue sequence numbers vs. independant message IDs. + SCHEDULED: <2011-10-07 Fri> -Multicast messages in QueueOwner class: +Current prototype uses queue sequence numbers to identify +message. This is tricky for updating new members as the sequence +numbers are only known on delivery. -consume(q): - if sender==self and consumers[q].empty(): ownership[q].take() - consumers[q].insert(sender) +Independent message IDs that can be generated and sent with the message simplify +this and potentially allow performance benefits by relaxing total ordering. +However they imply additional map lookups that might hurt performance. -release(q): - asssert(owner[q] == sender and owner[q] in consumers[q]) - owner[q] = circular search from sender in consumers[q] - if owner==self: ownership[q].take() +- [X] Prototype independent message IDs, check performance. +Throughput worse by 30% in contented case, 10% in uncontended. +Sticking with queue sequence numbers. -cancel(q): - assert(queue[q].owner != sender) # sender must release() before cancel() - consumers[q].erase(sender) +* Outstanding Tasks +** TODO [#A] Defer and async completion of wiring commands. -member-leaves: - for q in queue: if owner[q] = left: left.release(q) +Testing requirement: Many tests assume wiring changes are visible +across the cluster once the commad completes. -Need 2 more intercept points in broker::Cluster: +Name clashes: need to avoid race if same name queue/exchange declared +on 2 brokers simultaneously -consume(q,consumer,consumerCount) - Queue::consume() - if consumerCount == 1: mcast consume(q) +** TODO [#A] Passing all existing cluster tests. -cancel(q,consumer,consumerCount) - Queue::cancel() - if consumerCount == 0: - ownership[q].drop() - mcast cancel(q) +The new cluster should be a drop-in replacement for the old, so it +should be able to pass all the existing tests. -#TODO: lifecycle, updating cluster data structures when queues are destroyed - -*** Increasing concurrency -The major performance limitation of the old cluster is that it does -everything in the single CPG deliver thread context. - -We can get additional concurrency by creating a thread context _per queue_ -for queue operations: enqueue, acquire, accept etc. - -We associate a PollableQueue of queue operations with each AMQP queue. -The CPG deliver thread would -- build messages and associate with cluster IDs. -- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. - -Serializing operations on the same queue avoids contention, but takes advantage -of the independence of operations on separate queues. +** TODO [#A] Update to new members joining. -*** Re-use of existing cluster code -- re-use Event -- re-use Multicaster -- re-use same PollableQueueSetup (may experiment later) -- new Core class to replace Cluster. -- keep design modular, keep threading rules clear. +Need to resolve [[Queue sequence numbers vs. independant message IDs]] first. +- implicit sequence numbers are more tricky to replicate to new member. -** TODO [#B] Large message replication. -Multicast should encode messages in fixed size buffers (64k)? -Can't assume we can send message in one chunk. -For 0-10 can use channel numbers & send whole frames packed into larger buffer. -** TODO [#B] Transaction support. -Extend broker::Cluster interface to capture transaction context and completion. -Sequence number to generate per-node tx IDs. -Replicate transaction completion. -** TODO [#B] Batch CPG multicast messages -The new cluster design involves a lot of small multicast messages, -they need to be batched into larger CPG messages for efficiency. -** TODO [#B] Genuine async completion -Replace current synchronous waiting implementation with genuine async completion. +Update individual objects (queues and exchanges) independently. +- create queues first, then update all queues and exchanges in parallel. +- multiple updater threads, per queue/exchange. +- updater sends messages to special exchange(s) (not using extended AMQP controls) + +Queue updater: +- marks the queue position at the sync point +- sends messages starting from the sync point working towards the head of the queue. +- send "done" message. +Note: updater remains active throughout, consuming clients actually reduce the +size of the update. -Test: enhance test_store.cpp to defer enqueueComplete till special message received. +Queue updatee: +- enqueues received from CPG: add to back of queue as normal. +- dequeues received from CPG: apply if found, else save to check at end of update. +- messages from updater: add to the *front* of the queue. +- update complete: apply any saved dequeues. -Async callback uses *requestIOProcessing* to queue action on IO thread. +Exchange updater: +- updater: send snapshot of exchange as it was at the sync point. -** TODO [#B] Async completion of accept when dequeue completes. -Interface is already there on broker::Message, just need to ensure -that store and cluster implementations call it appropriately. +Exchange updatee: +- queue exchange operations after the sync point. +- when snapshot is received: apply saved operations. -** TODO [#B] Replicate wiring. -From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. +Updater remains active throughout. +Updatee stalls clients until the update completes. -** TODO [#B] New members joining - first pass +Updating queue/exchange/binding objects is via the same encode/decode +that is used by the store. Updatee to use recovery interfaces to +recover? -Re-use update code from old cluster but don't replicate sessions & -connections. +** TODO [#A] Failover updates to client. +Implement the amq.failover exchange to notify clients of membership. -Need to extend it to send cluster IDs with messages. +** TODO [#B] Initial status protocol. +Handshake to give status of each broker member to new members joining. +Status includes +- persistent store state (clean, dirty) +- cluster protocol version. -Need to replicate the queue ownership data as part of the update. +** TODO [#B] Replace boost::hash with our own hash function. +The hash function is effectively part of the interface so +we need to be sure it doesn't change underneath us. -** TODO [#B] Persistence support. -InitialStatus protoocl etc. to support persistent start-up (existing code) +** TODO [#B] Persistent cluster support. +Initial status protoocl to support persistent start-up (see existing code) Only one broker recovers from store, update to others. Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. -** TODO [#B] Handle other ways that messages can leave a queue. - -Other ways (other than via a consumer) that messages are take off a queue. - -NOTE: Not controlled by queue lock, how to make them consistent? - +** TODO [#B] Management support +Replicate management methods that modify queues - e.g. move, purge. Target broker may not have all messages on other brokers for purge/destroy. - Queue::move() - need to wait for lock? Replicate? - Queue::get() - ??? @@ -395,66 +175,48 @@ Target broker may not have all messages on other brokers for purge/destroy. Need to add callpoints & mcast messages to replicate these? -** TODO [#B] Flow control for internal queues. - -Need to bound the size of internal queues: delivery and multicast. -- stop polling for read on client connections when we reach a bound. -- restart polling when we get back under it. - -That will stop local multicasting, we still have to deal with remote -multicasting (note existing cluster does not do this.) Something like: -- when over bounds multicast a flow-control event. -- on delivery of flow-control all members stop polling to read client connections -- when back under bounds send flow-control-end, all members resume -- if flow-controling member dies others resume - -** TODO [#B] Integration with transactions. -Do we want to replicate during transaction & replicate commit/rollback -or replicate only on commit? -No integration with DTX transactions. -** TODO [#B] Make new cluster work with replication exchange. -Possibly re-use some common logic. Replication exchange is like clustering -except over TCP. -** TODO [#B] Better concurrency, scalabiility on multi-cores. -Introduce PollableQueue of operations per broker queue. Queue up mcast -operations (enqueue, acquire, accept etc.) to be handled concurrently -on different queue. Performance testing to verify improved scalability. -** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. -Cluster needs to complete these asynchronously to guarantee resources -exist across the cluster when the command completes. +** TODO [#B] TX transaction support. +Extend broker::Cluster interface to capture transaction context and completion. +Running brokers exchange TX information. +New broker update includes TX information. -** TODO [#C] Allow non-replicated exchanges, queues. + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted -Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. -- save replicated status to store. -- support in management tools. -Replicated exchange: replicate binds to replicated queues. -Replicated queue: replicate all messages. +** TODO [#B] DTX transaction support. +Extend broker::Cluster interface to capture transaction context and completion. +Running brokers exchange DTX information. +New broker update includes DTX information. -** TODO [#C] New members joining - improved. +** TODO [#B] Async completion of accept. +When this is fixed in the standalone broker, it should be fixed for cluster. -Replicate wiring like old cluster, stall for wiring but not for -messages. Update messages on a per-queue basis from back to front. +** TODO [#B] Network partitions and quorum. +Re-use existing implementation. -Updater: -- stall & push wiring: declare exchanges, queues, bindings. -- start update iterator thread on each queue. -- unstall and process normally while iterator threads run. +** TODO [#B] Review error handling, put in a consitent model. +- [ ] Review all asserts, for possible throw. +- [ ] Decide on fatal vs. non-fatal errors. -Update iterator thread: -- starts at back of updater queue, message m. -- send update_front(q,m) to updatee and advance towards front -- at front: send update_done(q) +** TODO [#B] Implement inconsistent error handling policy. +What to do if a message is enqueued sucessfully on the local broker, +but fails on one or more backups - e.g. due to store limits? +- we have more flexibility, we don't *have* to crash +- but we've loste some of our redundancy guarantee, how should we inform client? -Updatee: -- stall, receive wiring, lock all queues, mark queues "updating", unstall -- update_front(q,m): push m to *front* of q -- update_done(q): mark queue "ready" +** TODO [#C] Allow non-replicated exchanges, queues. -Updatee cannot take the queue consume lock for a queue that is updating. -Updatee *can* push messages onto a queue that is updating. +Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects. +- save replicated status to store. +- support in management tools. +Replicated queue: replicate all messages. +Replicated exchange: replicate bindings to replicated queues only. -TODO: Is there any way to eliminate the stall for wiring? +Configurable default? Defaults to true. ** TODO [#C] Refactoring of common concerns. @@ -469,9 +231,46 @@ Look for ways to capitalize on the similarity & simplify the code. In particular QueuedEvents (async replication) strongly resembles cluster replication, but over TCP rather than multicast. -** TODO [#C] Concurrency for enqueue events. -All enqueue events are being processed in the CPG deliver thread context which -serializes all the work. We only need ordering on a per queue basis, can we -enqueue in parallel on different queues and will that improve performance? + ** TODO [#C] Handling immediate messages in a cluster Include remote consumers in descision to deliver an immediate message? +** TODO [#C] Remove old cluster hacks and workarounds +The old cluster has workarounds in the broker code that can be removed. +- [ ] drop code to replicate management model. +- [ ] drop timer workarounds for TTL, management, heartbeats. +- [ ] drop "cluster-safe assertions" in broker code. +- [ ] drop connections, sessions, management from cluster update. +- [ ] drop security workarounds: cluster code now operates after message decoding. +- [ ] drop connection tracking in cluster code. +- [ ] simpler inconsistent-error handling code, no need to stall. + +** TODO [#C] Support for live upgrades. + +Allow brokers in a running cluster to be replaced one-by-one with a new version. +(see new-cluster-design for design notes.) + +The old cluster protocol was unstable because any changes in broker +state caused changes to the cluster protocol.The new design should be +much more stable. + +Points to implement in anticipation of live upgrade: +- Prefix each CPG message with a version number and length. + Version number determines how to decode the message. +- Brokers ignore messages that have a higher version number than they understand. +- Protocol version XML element in cluster.xml, on each control. +- Initial status protocol to include protocol version number. + +New member udpates: use the store encode/decode for updates, use the +same backward compatibility strategy as the store. This allows for +adding new elements to the end of structures but not changing or +removing new elements. + +** TODO [#C] Support for AMQP 1.0. + +* Testing +** TODO [#A] Pass all existing cluster tests. +Requires [[Defer and async completion of wiring commands.]] +** TODO [#A] New cluster tests. +Stress tests & performance benchmarks focused on changes in new cluster: +- concurrency by queues rather than connections. +- different handling shared queues when consuemrs are on different brokers. diff --git a/qpid/cpp/docs/man/qpidd.1 b/qpid/cpp/docs/man/qpidd.1 index 1e2d3aabee..8ba6f9ebf7 100644 --- a/qpid/cpp/docs/man/qpidd.1 +++ b/qpid/cpp/docs/man/qpidd.1 @@ -132,6 +132,10 @@ at which an event will be raised Group identifier to assign to messages delivered to a message group queue that do not contain an identifier. +.TP +\fB\-\-enable\-timestamp\fR yes|no (0) +Add current time to each received +message. .SS "Logging options:" .TP \fB\-t\fR [ \fB\-\-trace\fR ] diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index da1f539108..bb46f1258b 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -665,6 +665,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) set (qpidd_platform_SOURCES windows/QpiddBroker.cpp + windows/SCM.cpp ) set (qpidmessaging_platform_SOURCES diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 6230a8f6f6..e87cd3b4e0 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -54,6 +54,8 @@ windows_dist = \ qpid/sys/windows/uuid.cpp \ qpid/sys/windows/uuid.h \ windows/QpiddBroker.cpp \ + windows/SCM.h \ + windows/SCM.cpp \ qpid/broker/windows/BrokerDefaults.cpp \ qpid/broker/windows/SaslAuthenticator.cpp \ qpid/broker/windows/SslProtocolFactory.cpp \ @@ -497,6 +499,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/Waitable.h \ qpid/sys/alloca.h \ qpid/sys/uuid.h \ + qpid/sys/unordered_map.h \ qpid/amqp_0_10/Codecs.cpp if HAVE_SASL diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index 1270b57252..1cebcfc3ac 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -196,3 +196,8 @@ int QpiddBroker::execute (QpiddOptions *options) { } return 0; } + +int main(int argc, char* argv[]) +{ + return run_broker(argc, argv); +} diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 4da8470f2f..3b6583bfaf 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -93,6 +93,7 @@ libqmf2_la_SOURCES = \ qmf/AgentEventImpl.h \ qmf/AgentImpl.h \ qmf/AgentSession.cpp \ + qmf/AgentSessionImpl.h \ qmf/AgentSubscription.cpp \ qmf/AgentSubscription.h \ qmf/ConsoleEvent.cpp \ @@ -106,19 +107,21 @@ libqmf2_la_SOURCES = \ qmf/Data.cpp \ qmf/DataImpl.h \ qmf/EventNotifierImpl.cpp \ - qmf/PosixEventNotifier.cpp \ - qmf/PosixEventNotifierImpl.cpp \ + qmf/EventNotifierImpl.h \ qmf/exceptions.cpp \ qmf/Expression.cpp \ qmf/Expression.h \ qmf/Hash.cpp \ qmf/Hash.h \ + qmf/PosixEventNotifier.cpp \ + qmf/PosixEventNotifierImpl.cpp \ + qmf/PosixEventNotifierImpl.h \ qmf/PrivateImplRef.h \ qmf/Query.cpp \ qmf/QueryImpl.h \ - qmf/Schema.cpp \ qmf/SchemaCache.cpp \ qmf/SchemaCache.h \ + qmf/Schema.cpp \ qmf/SchemaId.cpp \ qmf/SchemaIdImpl.h \ qmf/SchemaImpl.h \ diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index f75663e131..9d363d3012 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -35,17 +35,17 @@ using qpid::framing::Uuid; SchemaHash::SchemaHash() { for (int idx = 0; idx < 16; idx++) - hash[idx] = 0x5A; + hash.b[idx] = 0x5A; } void SchemaHash::encode(Buffer& buffer) const { - buffer.putBin128(hash); + buffer.putBin128(hash.b); } void SchemaHash::decode(Buffer& buffer) { - buffer.getBin128(hash); + buffer.getBin128(hash.b); } void SchemaHash::update(uint8_t data) @@ -55,12 +55,8 @@ void SchemaHash::update(uint8_t data) void SchemaHash::update(const char* data, uint32_t len) { - union h { - uint8_t b[16]; - uint64_t q[2]; - }* h = reinterpret_cast<union h*>(&hash[0]); - uint64_t* first = &h->q[0]; - uint64_t* second = &h->q[1]; + uint64_t* first = &hash.q[0]; + uint64_t* second = &hash.q[1]; for (uint32_t idx = 0; idx < len; idx++) { *first = *first ^ (uint64_t) data[idx]; *second = *second << 1; diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index 8b079a5ec6..683fb6f8f0 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -35,7 +35,10 @@ namespace engine { // they've been registered. class SchemaHash { - uint8_t hash[16]; + union h { + uint8_t b[16]; + uint64_t q[2]; + } hash; public: SchemaHash(); void encode(qpid::framing::Buffer& buffer) const; @@ -47,7 +50,7 @@ namespace engine { void update(Direction d) { update((uint8_t) d); } void update(Access a) { update((uint8_t) a); } void update(bool b) { update((uint8_t) (b ? 1 : 0)); } - const uint8_t* get() const { return hash; } + const uint8_t* get() const { return hash.b; } bool operator==(const SchemaHash& other) const; bool operator<(const SchemaHash& other) const; bool operator>(const SchemaHash& other) const; diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.cpp b/qpid/cpp/src/qpid/RefCountedBuffer.cpp index 40d620f7ad..a82e1a02ab 100644 --- a/qpid/cpp/src/qpid/RefCountedBuffer.cpp +++ b/qpid/cpp/src/qpid/RefCountedBuffer.cpp @@ -20,19 +20,22 @@ */ #include "qpid/RefCountedBuffer.h" +#include <stdlib.h> #include <new> namespace qpid { void RefCountedBuffer::released() const { this->~RefCountedBuffer(); - ::delete[] reinterpret_cast<const char*>(this); + ::free (reinterpret_cast<void *>(const_cast<RefCountedBuffer *>(this))); } BufferRef RefCountedBuffer::create(size_t n) { - char* store=::new char[n+sizeof(RefCountedBuffer)]; + void* store=::malloc (n + sizeof(RefCountedBuffer)); + if (NULL == store) + throw std::bad_alloc(); new(store) RefCountedBuffer; - char* start = store+sizeof(RefCountedBuffer); + char* start = reinterpret_cast<char *>(store) + sizeof(RefCountedBuffer); return BufferRef( boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)), start, start+n); diff --git a/qpid/cpp/src/qpid/Sasl.h b/qpid/cpp/src/qpid/Sasl.h index 9a9d61b037..4d579fa051 100644 --- a/qpid/cpp/src/qpid/Sasl.h +++ b/qpid/cpp/src/qpid/Sasl.h @@ -47,8 +47,8 @@ class Sasl * client supports. * @param externalSecuritySettings security related details from the underlying transport */ - virtual std::string start(const std::string& mechanisms, - const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0; + virtual bool start(const std::string& mechanisms, std::string& response, + const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0; virtual std::string step(const std::string& challenge) = 0; virtual std::string getMechanism() = 0; virtual std::string getUserId() = 0; diff --git a/qpid/cpp/src/qpid/SaslFactory.cpp b/qpid/cpp/src/qpid/SaslFactory.cpp index f117404028..a8d1f94c1e 100644 --- a/qpid/cpp/src/qpid/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/SaslFactory.cpp @@ -112,7 +112,7 @@ class CyrusSasl : public Sasl public: CyrusSasl(const std::string & username, const std::string & password, const std::string & serviceName, const std::string & hostName, int minSsf, int maxSsf, bool allowInteraction); ~CyrusSasl(); - std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings); + bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings); std::string step(const std::string& challenge); std::string getMechanism(); std::string getUserId(); @@ -210,7 +210,7 @@ namespace { const std::string SSL("ssl"); } -std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings) +bool CyrusSasl::start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings) { QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")"); int result = sasl_client_new(settings.service.c_str(), @@ -283,7 +283,12 @@ std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettin mechanism = std::string(chosenMechanism); QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected " << mechanism << " response: '" << std::string(out, outlen) << "'"); - return std::string(out, outlen); + if (out) { + response = std::string(out, outlen); + return true; + } else { + return false; + } } std::string CyrusSasl::step(const std::string& challenge) diff --git a/qpid/cpp/src/qpid/acl/AclPlugin.cpp b/qpid/cpp/src/qpid/acl/AclPlugin.cpp index e4d721ea44..d611797c49 100644 --- a/qpid/cpp/src/qpid/acl/AclPlugin.cpp +++ b/qpid/cpp/src/qpid/acl/AclPlugin.cpp @@ -69,7 +69,7 @@ struct AclPlugin : public Plugin { } acl = new Acl(values, b); - b.setAcl(acl.get()); + b.setAcl(acl.get()); b.addFinalizer(boost::bind(&AclPlugin::shutdown, this)); } diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index bd94582d10..ec3cf9d340 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -43,6 +43,8 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) : queueFlowStopRatio(80), queueFlowResumeRatio(70), queueThresholdEventRatio(80), - defaultMsgGroup("qpid.no-group") + defaultMsgGroup("qpid.no-group"), + timestampRcvMsgs(false) // set the 0.10 timestamp delivery property { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) : ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") - ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") + ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message."); } const std::string empty; @@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) : else QPID_LOG(info, "Management not enabled"); + // this feature affects performance, so let's be sure that gets logged! + if (conf.timestampRcvMsgs) { + QPID_LOG(notice, "Receive message timestamping is ENABLED."); + } + /** * SASL setup, can fail and terminate startup */ @@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); - status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args); + status = getTimestampConfig(a.o_receive, getManagementExecutionContext()); + break; + } + case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args); + status = setTimestampConfig(a.i_receive, getManagementExecutionContext()); + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type"); const std::string QUEUE_NAME("queue"); const std::string EXCHANGE_NAME("exchange"); +const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); + const std::string _TRUE("true"); const std::string _FALSE("false"); } @@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name, return Manageable::STATUS_OK;; } +Manageable::status_t Broker::getTimestampConfig(bool& receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId)); + } + receive = config.timestampRcvMsgs; + return Manageable::STATUS_OK; +} + +Manageable::status_t Broker::setTimestampConfig(const bool receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId)); + } + config.timestampRcvMsgs = receive; + QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED.")); + return Manageable::STATUS_OK; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 8b347db3c0..b3b751be98 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -122,6 +122,7 @@ public: uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; std::string defaultMsgGroup; + bool timestampRcvMsgs; private: std::string getHome(); @@ -164,6 +165,10 @@ public: const std::string& userId, const std::string& connectionId, qpid::types::Variant::Map& results); + Manageable::status_t getTimestampConfig(bool& receive, + const ConnectionState* context); + Manageable::status_t setTimestampConfig(const bool receive, + const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -315,6 +320,7 @@ public: const boost::intrusive_ptr<Message>& msg)> deferDelivery; bool isAuthenticating ( ) { return config.auth; } + bool isTimestamping() { return config.timestampRcvMsgs; } typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 015002a70c..7cd91ae539 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/SecureConnection.h" #include "qpid/Url.h" #include "qpid/framing/AllInvoker.h" +#include "qpid/framing/ConnectionStartOkBody.h" #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" #include "qpid/sys/SecurityLayer.h" @@ -63,13 +64,24 @@ void ConnectionHandler::heartbeat() handler->proxy.heartbeat(); } +bool ConnectionHandler::handle(const framing::AMQMethodBody& method) +{ + //Need special handling for start-ok, in order to distinguish + //between null and empty response + if (method.isA<ConnectionStartOkBody>()) { + handler->startOk(dynamic_cast<const ConnectionStartOkBody&>(method)); + return true; + } else { + return invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), method); + } +} + void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ - if (method && invoke( - static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), *method)) { + if (method && handle(*method)) { // This is a connection control frame, nothing more to do. } else if (isOpen()) { handler->connection.getChannel(frame.getChannel()).in(frame); @@ -96,13 +108,10 @@ ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) : proxy(c.getOutput()), - connection(c), serverMode(!isClient), acl(0), secured(0), + connection(c), serverMode(!isClient), secured(0), isOpen(false) { if (serverMode) { - - acl = connection.getBroker().getAcl(); - FieldTable properties; Array mechanisms(0x95); @@ -125,13 +134,20 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) ConnectionHandler::Handler::~Handler() {} -void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProperties, - const string& mechanism, - const string& response, +void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/, + const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/) { + //Need special handling for start-ok, in order to distinguish + //between null and empty response -> should never use this method + assert(false); +} + +void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) +{ try { - authenticator->start(mechanism, response); + authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0); } catch (std::exception& /*e*/) { management::ManagementAgent* agent = connection.getAgent(); if (agent) { @@ -143,11 +159,14 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper } throw; } + const framing::FieldTable& clientProperties = body.getClientProperties(); connection.setFederationLink(clientProperties.get(QPID_FED_LINK)); if (clientProperties.isSet(QPID_FED_TAG)) { connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); } if (connection.isFederationLink()) { + AclModule* acl = connection.getBroker().getAcl(); + FieldTable properties; if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); return; @@ -308,11 +327,21 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, string response; if (sasl.get()) { const qpid::sys::SecuritySettings& ss = connection.getExternalSecuritySettings(); - response = sasl->start ( requestedMechanism.empty() - ? supportedMechanismsList - : requestedMechanism, - & ss ); - proxy.startOk ( ft, sasl->getMechanism(), response, en_US ); + if (sasl->start ( requestedMechanism.empty() + ? supportedMechanismsList + : requestedMechanism, + response, + & ss )) { + proxy.startOk ( ft, sasl->getMechanism(), response, en_US ); + } else { + //response was null + ConnectionStartOkBody body; + body.setClientProperties(ft); + body.setMechanism(sasl->getMechanism()); + //Don't set response, as none was given + body.setLocale(en_US); + proxy.send(body); + } } else { response = ((char)0) + username + ((char)0) + password; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index b32167669e..05c5f00c57 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -26,8 +26,10 @@ #include "qpid/broker/SaslAuthenticator.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AMQP_AllProxy.h" +#include "qpid/framing/ConnectionStartOkBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ProtocolInitiation.h" @@ -57,12 +59,12 @@ class ConnectionHandler : public framing::FrameHandler Connection& connection; bool serverMode; std::auto_ptr<SaslAuthenticator> authenticator; - AclModule* acl; SecureConnection* secured; bool isOpen; Handler(Connection& connection, bool isClient, bool isShadow=false); ~Handler(); + void startOk(const qpid::framing::ConnectionStartOkBody& body); void startOk(const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, const std::string& locale); @@ -96,7 +98,7 @@ class ConnectionHandler : public framing::FrameHandler }; std::auto_ptr<Handler> handler; - + bool handle(const qpid::framing::AMQMethodBody& method); public: ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false ); void close(framing::connection::CloseCode code, const std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 5ea7143366..d13109dad1 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp() +{ + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); + time_t now = ::time(0); + props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch +} + +void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 2a23a25d06..dda45d73e6 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -81,7 +81,8 @@ public: QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); - QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + /** determine msg expiration time using the TTL value if present */ + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } @@ -93,6 +94,8 @@ public: QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key); void setExchange(const std::string&); void clearApplicationHeadersFlag(); + /** set the timestamp delivery property to the current time-of-day */ + QPID_BROKER_EXTERN void setTimestamp(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3d878d02a8..4627b1409a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -952,6 +952,31 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri } } +bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return false; + } else if (v->convertsTo<int>()) { + return v->get<int>() != 0; + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + if (s == "True") return true; + if (s == "true") return true; + if (s == "False") return false; + if (s == "false") return false; + try { + return boost::lexical_cast<bool>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s); + return false; + } + } else { + QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v); + return false; + } +} + void Queue::configure(const FieldTable& _settings) { settings = _settings; @@ -983,7 +1008,7 @@ void Queue::configureImpl(const FieldTable& _settings) } //set this regardless of owner to allow use of no-local with exclusive consumers also - noLocal = _settings.get(qpidNoLocal); + noLocal = getBoolSetting(_settings, qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); @@ -991,11 +1016,11 @@ void Queue::configureImpl(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - } else if (_settings.get(qpidLastValueQueue)) { + } else if (getBoolSetting(_settings, qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); @@ -1015,7 +1040,7 @@ void Queue::configureImpl(const FieldTable& _settings) } } - persistLastNode= _settings.get(qpidPersistLastNode); + persistLastNode = getBoolSetting(_settings, qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 12a13ccfe6..d7adbd68ab 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -30,6 +30,7 @@ #include <boost/format.hpp> #if HAVE_SASL +#include <sys/stat.h> #include <sasl/sasl.h> #include "qpid/sys/cyrus/CyrusSecurityLayer.h" using qpid::sys::cyrus::CyrusSecurityLayer; @@ -57,7 +58,7 @@ public: NullAuthenticator(Connection& connection, bool encrypt); ~NullAuthenticator(); void getMechanisms(framing::Array& mechanisms); - void start(const std::string& mechanism, const std::string& response); + void start(const std::string& mechanism, const std::string* response); void step(const std::string&) {} std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; @@ -81,7 +82,7 @@ public: ~CyrusAuthenticator(); void init(); void getMechanisms(framing::Array& mechanisms); - void start(const std::string& mechanism, const std::string& response); + void start(const std::string& mechanism, const std::string* response); void step(const std::string& response); void getError(std::string& error); void getUid(std::string& uid) { getUsername(uid); } @@ -98,11 +99,33 @@ void SaslAuthenticator::init(const std::string& saslName, std::string const & sa // Check if we have a version of SASL that supports sasl_set_path() #if (SASL_VERSION_FULL >= ((2<<16)|(1<<8)|22)) // If we are not given a sasl path, do nothing and allow the default to be used. - if ( ! saslConfigPath.empty() ) { - int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, - const_cast<char *>(saslConfigPath.c_str())); + if ( saslConfigPath.empty() ) { + QPID_LOG ( info, "SASL: no config path set - using default." ); + } + else { + struct stat st; + + // Make sure the directory exists and we can read up to it. + if ( ::stat ( saslConfigPath.c_str(), & st) ) { + // Note: not using strerror() here because I think its messages are a little too hazy. + if ( errno == ENOENT ) + throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: no such directory: " << saslConfigPath ) ); + if ( errno == EACCES ) + throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot read parent of: " << saslConfigPath ) ); + // catch-all stat failure + throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: cannot stat: " << saslConfigPath ) ); + } + + // Make sure the directory is readable. + if ( ::access ( saslConfigPath.c_str(), R_OK ) ) { + throw Exception ( QPID_MSG ( "SASL: sasl_set_path failed: directory not readable:" << saslConfigPath ) ); + } + + // This shouldn't fail now, but check anyway. + int code = sasl_set_path(SASL_PATH_TYPE_CONFIG, const_cast<char *>(saslConfigPath.c_str())); if(SASL_OK != code) throw Exception(QPID_MSG("SASL: sasl_set_path failed [" << code << "] " )); + QPID_LOG(info, "SASL: config path set to " << saslConfigPath ); } #endif @@ -164,7 +187,7 @@ void NullAuthenticator::getMechanisms(Array& mechanisms) mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));//useful for testing } -void NullAuthenticator::start(const string& mechanism, const string& response) +void NullAuthenticator::start(const string& mechanism, const string* response) { if (encrypt) { #if HAVE_SASL @@ -180,16 +203,16 @@ void NullAuthenticator::start(const string& mechanism, const string& response) } } if (mechanism == "PLAIN") { // Old behavior - if (response.size() > 0) { + if (response && response->size() > 0) { string uid; - string::size_type i = response.find((char)0); - if (i == 0 && response.size() > 1) { + string::size_type i = response->find((char)0); + if (i == 0 && response->size() > 1) { //no authorization id; use authentication id - i = response.find((char)0, 1); - if (i != string::npos) uid = response.substr(1, i-1); + i = response->find((char)0, 1); + if (i != string::npos) uid = response->substr(1, i-1); } else if (i != string::npos) { //authorization id is first null delimited field - uid = response.substr(0, i); + uid = response->substr(0, i); }//else not a valid SASL PLAIN response, throw error? if (!uid.empty()) { //append realm if it has not already been added @@ -376,7 +399,7 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) } } -void CyrusAuthenticator::start(const string& mechanism, const string& response) +void CyrusAuthenticator::start(const string& mechanism, const string* response) { const char *challenge; unsigned int challenge_len; @@ -385,7 +408,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string& response) QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); int code = sasl_server_start(sasl_conn, mechanism.c_str(), - response.size() ? response.c_str() : 0, response.length(), + (response ? response->c_str() : 0), (response ? response->size() : 0), &challenge, &challenge_len); processAuthenticationStep(code, challenge, challenge_len); diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h index cfbe1a0cd1..4e5d43214c 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h @@ -41,7 +41,7 @@ class SaslAuthenticator public: virtual ~SaslAuthenticator() {} virtual void getMechanisms(framing::Array& mechanisms) = 0; - virtual void start(const std::string& mechanism, const std::string& response) = 0; + virtual void start(const std::string& mechanism, const std::string* response) = 0; virtual void step(const std::string& response) = 0; virtual void getUid(std::string&) {} virtual bool getUsername(std::string&) { return false; }; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 94d0cc87f7..fbcb21eab9 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -75,9 +75,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), closeComplete(false) -{ - acl = getSession().getBroker().getAcl(); -} +{} SemanticState::~SemanticState() { closed(); @@ -348,7 +346,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept - record.accept( 0 /*no ctxt*/ ); + queue->dequeue(0 /*ctxt*/, msg); + record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -471,7 +470,7 @@ const std::string nullstring; } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) @@ -487,6 +486,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); } + AclModule* acl = getSession().getBroker().getAcl(); if (acl && acl->doTransferAcl()) { if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() )) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 12ccc75f11..6d88dd56d9 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -165,7 +165,6 @@ class SemanticState : private boost::noncopyable { DtxBufferMap suspendedXids; framing::SequenceSet accumulatedAck; boost::shared_ptr<Exchange> cacheExchange; - AclModule* acl; const bool authMsg; const std::string userID; const std::string userName; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ddd6ae3f5b..1ab17e9893 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) header.setEof(false); msg->getFrames().append(header); } + if (broker.isTimestamping()) + msg->setTimestamp(); msg->setPublisher(&getConnection()); msg->getIngressCompletion().begin(); semanticState.handle(msg); diff --git a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp index d26b370632..2acc09cded 100644 --- a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp @@ -42,7 +42,7 @@ public: NullAuthenticator(Connection& connection); ~NullAuthenticator(); void getMechanisms(framing::Array& mechanisms); - void start(const std::string& mechanism, const std::string& response); + void start(const std::string& mechanism, const std::string* response); void step(const std::string&) {} std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; @@ -57,7 +57,7 @@ public: SspiAuthenticator(Connection& connection); ~SspiAuthenticator(); void getMechanisms(framing::Array& mechanisms); - void start(const std::string& mechanism, const std::string& response); + void start(const std::string& mechanism, const std::string* response); void step(const std::string& response); std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; @@ -96,12 +96,12 @@ void NullAuthenticator::getMechanisms(Array& mechanisms) mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN"))); } -void NullAuthenticator::start(const string& mechanism, const string& response) +void NullAuthenticator::start(const string& mechanism, const string* response) { QPID_LOG(warning, "SASL: No Authentication Performed"); if (mechanism == "PLAIN") { // Old behavior - if (response.size() > 0 && response[0] == (char) 0) { - string temp = response.substr(1); + if (response && response->size() > 0 && (*response).c_str()[0] == (char) 0) { + string temp = response->substr(1); string::size_type i = temp.find((char)0); string uid = temp.substr(0, i); string pwd = temp.substr(i + 1); @@ -139,7 +139,7 @@ void SspiAuthenticator::getMechanisms(Array& mechanisms) QPID_LOG(info, "SASL: Mechanism list: ANONYMOUS PLAIN"); } -void SspiAuthenticator::start(const string& mechanism, const string& response) +void SspiAuthenticator::start(const string& mechanism, const string* response) { QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); if (mechanism == "ANONYMOUS") { @@ -152,14 +152,14 @@ void SspiAuthenticator::start(const string& mechanism, const string& response) // PLAIN's response is composed of 3 strings separated by 0 bytes: // authorization id, authentication id (user), clear-text password. - if (response.size() == 0) + if (!response || response->size() == 0) throw ConnectionForcedException("Authentication failed"); - string::size_type i = response.find((char)0); - string auth = response.substr(0, i); - string::size_type j = response.find((char)0, i+1); - string uid = response.substr(i+1, j-1); - string pwd = response.substr(j+1); + string::size_type i = response->find((char)0); + string auth = response->substr(0, i); + string::size_type j = response->find((char)0, i+1); + string uid = response->substr(i+1, j-1); + string pwd = response->substr(j+1); string dot("."); int error = 0; if (!LogonUser(const_cast<char*>(uid.c_str()), diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 801fe38051..ab0d8e0700 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -254,8 +254,18 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me } if (sasl.get()) { - string response = sasl->start(join(mechlist), getSecuritySettings ? getSecuritySettings() : 0); - proxy.startOk(properties, sasl->getMechanism(), response, locale); + string response; + if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) { + proxy.startOk(properties, sasl->getMechanism(), response, locale); + } else { + //response was null + ConnectionStartOkBody body; + body.setClientProperties(properties); + body.setMechanism(sasl->getMechanism()); + //Don't set response, as none was given + body.setLocale(locale); + proxy.send(body); + } } else { //TODO: verify that desired mechanism and locale are supported string response = ((char)0) + username + ((char)0) + password; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 5cf20c92eb..3badaf40ba 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, @@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } - // routing-key, others? + // routing-key, timestamp, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } + if (deliveryProperties && deliveryProperties->hasTimestamp()) { + message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); + } } } diff --git a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp index d1ae762f1b..53d825771b 100644 --- a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp @@ -71,7 +71,7 @@ class WindowsSasl : public Sasl public: WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int ); ~WindowsSasl(); - std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings); + bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings); std::string step(const std::string& challenge); std::string getMechanism(); std::string getUserId(); @@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl() { } -std::string WindowsSasl::start(const std::string& mechanisms, - const SecuritySettings* /*externalSettings*/) +bool WindowsSasl::start(const std::string& mechanisms, std::string& response, + const SecuritySettings* /*externalSettings*/) { QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")"); @@ -142,15 +142,15 @@ std::string WindowsSasl::start(const std::string& mechanisms, if (!haveAnon && !havePlain) throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism")); - std::string resp = ""; if (havePlain) { mechanism = PLAIN; - resp = ((char)0) + settings.username + ((char)0) + settings.password; + response = ((char)0) + settings.username + ((char)0) + settings.password; } else { mechanism = ANONYMOUS; + response = ""; } - return resp; + return true; } std::string WindowsSasl::step(const std::string& /*challenge*/) diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 7432fbbc33..e6e3de64f2 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -278,7 +278,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), updateRetracted(false), updateClosed(false), - error(*this) + error(*this), + acl(0) { broker.setInCluster(true); @@ -856,6 +857,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) else if (updatee == self && url) { assert(state == JOINER); state = UPDATEE; + acl = broker.getAcl(); + broker.setAcl(0); // Disable ACL during update QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } @@ -956,6 +959,7 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); + broker.setAcl(acl); // Restore ACL discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index da5781b7a9..ccec4948e6 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -56,6 +56,7 @@ namespace qpid { namespace broker { class Message; +class AclModule; } namespace framing { @@ -312,6 +313,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { sys::Timer clockTimer; sys::AbsTime clusterTime; sys::Duration clusterTimeOffset; + broker::AclModule* acl; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend struct ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index 945c0a4d24..b3d1e2e1e4 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -59,7 +59,9 @@ void Uuid::clear() { // Force int 0/!0 to false/true; avoids compile warnings. bool Uuid::isNull() const { - return !!uuid_is_null(data()); + // This const cast is for Solaris which has a + // uuid_is_null that takes a non const argument + return !!uuid_is_null(const_cast<uint8_t*>(data())); } void Uuid::encode(Buffer& buf) const { diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 50fdc82ee0..5799a1adca 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data, dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->setTimestamp(broker->getExpiryPolicy()); + msg->computeExpiration(broker->getExpiryPolicy()); } msg->getFrames().append(content); msg->setIsManagementMessage(true); diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index 25f1c5fb9d..defec4879c 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -95,9 +95,11 @@ private: /** Create socket */ void createSocket(const SocketAddress&) const; +public: /** Construct socket with existing handle */ Socket(IOHandlePrivate*); +protected: mutable std::string localname; mutable std::string peername; mutable bool nonblocking; diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index 471a0cef60..ab15785492 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -25,6 +25,8 @@ #include "qpid/sys/ssl/check.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslHandler.h" +#include "qpid/sys/AsynchIOHandler.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" #include "qpid/broker/Broker.h" @@ -37,15 +39,19 @@ namespace qpid { namespace sys { +using namespace qpid::sys::ssl; + struct SslServerOptions : ssl::SslOptions { uint16_t port; bool clientAuth; bool nodict; + bool multiplex; SslServerOptions() : port(5671), clientAuth(false), - nodict(false) + nodict(false), + multiplex(false) { addOptions() ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections") @@ -56,15 +62,20 @@ struct SslServerOptions : ssl::SslOptions } }; -class SslProtocolFactory : public ProtocolFactory { +template <class T> +class SslProtocolFactoryTmpl : public ProtocolFactory { + private: + + typedef SslAcceptorTmpl<T> SslAcceptor; + const bool tcpNoDelay; - qpid::sys::ssl::SslSocket listener; + T listener; const uint16_t listeningPort; - std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor; + std::auto_ptr<SslAcceptor> acceptor; bool nodict; public: - SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -74,10 +85,14 @@ class SslProtocolFactory : public ProtocolFactory { bool supports(const std::string& capability); private: - void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*, + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; +typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory; +typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory; + + // Static instance to initialise plugin static struct SslPlugin : public Plugin { SslServerOptions options; @@ -86,10 +101,26 @@ static struct SslPlugin : public Plugin { ~SslPlugin() { ssl::shutdownNSS(); } - void earlyInitialize(Target&) { + void earlyInitialize(Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (broker && !options.certDbPath.empty()) { + const broker::Broker::Options& opts = broker->getOptions(); + + if (opts.port == options.port && // AMQP & AMQPS ports are the same + opts.port != 0) { + // The presence of this option is used to signal to the TCP + // plugin not to start listening on the shared port. The actual + // value cannot be configured through the command line or config + // file (other than by setting the ports to the same value) + // because we are only adding it after option parsing. + options.multiplex = true; + options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port"); + } + } } void initialize(Target& target) { + QPID_LOG(trace, "Initialising SSL plugin"); broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { @@ -100,10 +131,18 @@ static struct SslPlugin : public Plugin { ssl::initNSS(options, true); const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options, - opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort()); + + ProtocolFactory::shared_ptr protocol(options.multiplex ? + static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options, + opts.connectionBacklog, + opts.tcpNoDelay)) : + static_cast<ProtocolFactory*>(new SslProtocolFactory(options, + opts.connectionBacklog, + opts.tcpNoDelay))); + QPID_LOG(notice, "Listening for " << + (options.multiplex ? "SSL or TCP" : "SSL") << + " connections on TCP port " << + protocol->getPort()); broker->registerProtocolFactory("ssl", protocol); } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what()); @@ -113,13 +152,15 @@ static struct SslPlugin : public Plugin { } } sslPlugin; -SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) : +template <class T> +SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) : tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)), nodict(options.nodict) {} -void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s, - ConnectionCodec::Factory* f, bool isClient) { +void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, + ConnectionCodec::Factory* f, bool isClient, + bool tcpNoDelay, bool nodict) { qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict); if (tcpNoDelay) { @@ -127,8 +168,10 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys: QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); } - if (isClient) + if (isClient) { async->setClient(); + } + qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s, boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2), boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1), @@ -141,19 +184,64 @@ void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys: aio->start(poller); } -uint16_t SslProtocolFactory::getPort() const { +template <> +void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { + const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); + + SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); +} + +template <class T> +uint16_t SslProtocolFactoryTmpl<T>::getPort() const { return listeningPort; // Immutable no need for lock. } -void SslProtocolFactory::accept(Poller::shared_ptr poller, - ConnectionCodec::Factory* fact) { +template <class T> +void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { acceptor.reset( - new qpid::sys::ssl::SslAcceptor(listener, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + new SslAcceptor(listener, + boost::bind(&SslProtocolFactoryTmpl<T>::established, + this, poller, _1, fact, false))); acceptor->start(poller); } -void SslProtocolFactory::connect( +template <> +void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { + const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s); + + if (sslSock) { + SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict); + return; + } + + AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f); + + if (tcpNoDelay) { + s.setTcpNoDelay(); + QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress()); + } + + if (isClient) { + async->setClient(); + } + AsynchIO* aio = AsynchIO::create + (s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + + async->init(aio, 4); + aio->start(poller); +} + +template <class T> +void SslProtocolFactoryTmpl<T>::connect( Poller::shared_ptr poller, const std::string& host, const std::string& port, ConnectionCodec::Factory* fact, @@ -166,9 +254,9 @@ void SslProtocolFactory::connect( // is no longer needed. qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket(); - new qpid::sys::ssl::SslConnector (*socket, poller, host, port, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true), - failed); + new SslConnector(*socket, poller, host, port, + boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true), + failed); } namespace @@ -176,6 +264,7 @@ namespace const std::string SSL = "ssl"; } +template <> bool SslProtocolFactory::supports(const std::string& capability) { std::string s = capability; @@ -183,4 +272,12 @@ bool SslProtocolFactory::supports(const std::string& capability) return s == SSL; } +template <> +bool SslMuxProtocolFactory::supports(const std::string& capability) +{ + std::string s = capability; + transform(s.begin(), s.end(), s.begin(), tolower); + return s == SSL || s == "tcp"; +} + }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index 85d8c1db87..8a99d8db71 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -43,7 +43,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory { uint16_t listeningPort; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay); + AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -57,6 +57,20 @@ class AsynchIOProtocolFactory : public ProtocolFactory { void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; +static bool sslMultiplexEnabled(void) +{ + Options o; + Plugin::addOptions(o); + + if (o.find_nothrow("ssl-multiplex", false)) { + // This option is added by the SSL plugin when the SSL port + // is configured to be the same as the main port. + QPID_LOG(notice, "SSL multiplexing enabled"); + return true; + } + return false; +} + // Static instance to initialise plugin static class TCPIOPlugin : public Plugin { void earlyInitialize(Target&) { @@ -67,20 +81,31 @@ static class TCPIOPlugin : public Plugin { // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); + + // Check for SSL on the same port + bool shouldListen = !sslMultiplexEnabled(); + ProtocolFactory::shared_ptr protocolt( new AsynchIOProtocolFactory( "", boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); + opts.tcpNoDelay, + shouldListen)); + if (shouldListen) { + QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort()); + } broker->registerProtocolFactory("tcp", protocolt); } } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) : tcpNoDelay(nodelay) { + if (!shouldListen) { + return; + } + SocketAddress sa(host, port); // We must have at least one resolved address diff --git a/qpid/cpp/src/qpid/sys/alloca.h b/qpid/cpp/src/qpid/sys/alloca.h index 0f58920908..b3f59b7c3f 100644 --- a/qpid/cpp/src/qpid/sys/alloca.h +++ b/qpid/cpp/src/qpid/sys/alloca.h @@ -34,9 +34,9 @@ # endif # define alloca _alloca # endif -# if !defined _WINDOWS && !defined WIN32 -# include <alloca.h> -# endif +#endif +#if !defined _WINDOWS && !defined WIN32 +# include <alloca.h> #endif #endif /*!QPID_SYS_ALLOCA_H*/ diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index 734ebb483a..4a59819183 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -68,29 +68,33 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms * Asynch Acceptor */ -SslAcceptor::SslAcceptor(const SslSocket& s, Callback callback) : +template <class T> +SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0), + handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0), socket(s) { s.setNonblocking(); ignoreSigpipe(); } -SslAcceptor::~SslAcceptor() +template <class T> +SslAcceptorTmpl<T>::~SslAcceptorTmpl() { handle.stopWatch(); } -void SslAcceptor::start(Poller::shared_ptr poller) { +template <class T> +void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) { handle.startWatch(poller); } /* * We keep on accepting as long as there is something to accept */ -void SslAcceptor::readable(DispatchHandle& h) { - SslSocket* s; +template <class T> +void SslAcceptorTmpl<T>::readable(DispatchHandle& h) { + Socket* s; do { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should @@ -110,6 +114,10 @@ void SslAcceptor::readable(DispatchHandle& h) { h.rewatch(); } +// Explicitly instantiate the templates we need +template class SslAcceptorTmpl<SslSocket>; +template class SslAcceptorTmpl<SslMuxSocket>; + /* * Asynch Connector */ diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.h b/qpid/cpp/src/qpid/sys/ssl/SslIo.h index 8785852c24..c980d73831 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.h @@ -29,26 +29,30 @@ namespace qpid { namespace sys { + +class Socket; + namespace ssl { - + class SslSocket; /* * Asynchronous ssl acceptor: accepts connections then does a callback * with the accepted fd */ -class SslAcceptor { +template <class T> +class SslAcceptorTmpl { public: - typedef boost::function1<void, const SslSocket&> Callback; + typedef boost::function1<void, const Socket&> Callback; private: Callback acceptedCallback; qpid::sys::DispatchHandle handle; - const SslSocket& socket; + const T& socket; public: - SslAcceptor(const SslSocket& s, Callback callback); - ~SslAcceptor(); + SslAcceptorTmpl(const T& s, Callback callback); + ~SslAcceptorTmpl(); void start(qpid::sys::Poller::shared_ptr poller); private: diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp index f7483a220c..30234bb686 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -25,11 +25,13 @@ #include "qpid/Exception.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/errno.h> +#include <poll.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <netdb.h> @@ -50,36 +52,6 @@ namespace sys { namespace ssl { namespace { -std::string getName(int fd, bool local, bool includeService = false) -{ - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - - int result = -1; - if (local) { - result = ::getsockname(fd, (::sockaddr*)&name, &namelen); - } else { - result = ::getpeername(fd, (::sockaddr*)&name, &namelen); - } - - QPID_POSIX_CHECK(result); - - char servName[NI_MAXSERV]; - char dispName[NI_MAXHOST]; - if (includeService) { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); - return std::string(dispName) + ":" + std::string(servName); - - } else { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) - throw QPID_POSIX_ERROR(rc); - return dispName; - } -} - std::string getService(int fd, bool local) { ::sockaddr_storage name; // big enough for any socket address @@ -132,7 +104,7 @@ std::string getDomainFromSubject(std::string subject) } -SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0) +SslSocket::SslSocket() : socket(0), prototype(0) { impl->fd = ::socket (PF_INET, SOCK_STREAM, 0); if (impl->fd < 0) throw QPID_POSIX_ERROR(errno); @@ -144,7 +116,7 @@ SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0 * returned from accept. Because we use posix accept rather than * PR_Accept, we have to reset the handshake. */ -SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0) +SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0) { socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd)); NSS_CHECK(SSL_ResetHandshake(socket, true)); @@ -238,6 +210,7 @@ int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, b SslSocket* SslSocket::accept() const { + QPID_LOG(trace, "Accepting SSL connection."); int afd = ::accept(impl->fd, 0, 0); if ( afd >= 0) { return new SslSocket(new IOHandlePrivate(afd), prototype); @@ -248,36 +221,109 @@ SslSocket* SslSocket::accept() const } } -int SslSocket::read(void *buf, size_t count) const -{ - return PR_Read(socket, buf, count); -} +#define SSL_STREAM_MAX_WAIT_ms 20 +#define SSL_STREAM_MAX_RETRIES 2 -int SslSocket::write(const void *buf, size_t count) const -{ - return PR_Write(socket, buf, count); -} +static bool isSslStream(int afd) { + int retries = SSL_STREAM_MAX_RETRIES; + unsigned char buf[5] = {}; -std::string SslSocket::getSockname() const -{ - return getName(impl->fd, true); + do { + struct pollfd fd = {afd, POLLIN, 0}; + + /* + * Note that this is blocking the accept thread, so connections that + * send no data can limit the rate at which we can accept new + * connections. + */ + if (::poll(&fd, 1, SSL_STREAM_MAX_WAIT_ms) > 0) { + errno = 0; + int result = recv(afd, buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT); + if (result == sizeof(buf)) { + break; + } + if (errno && errno != EAGAIN) { + int err = errno; + ::close(afd); + throw QPID_POSIX_ERROR(err); + } + } + } while (retries-- > 0); + + if (retries < 0) { + return false; + } + + /* + * SSLv2 Client Hello format + * http://www.mozilla.org/projects/security/pki/nss/ssl/draft02.html + * + * Bytes 0-1: RECORD-LENGTH + * Byte 2: MSG-CLIENT-HELLO (1) + * Byte 3: CLIENT-VERSION-MSB + * Byte 4: CLIENT-VERSION-LSB + * + * Allowed versions: + * 2.0 - SSLv2 + * 3.0 - SSLv3 + * 3.1 - TLS 1.0 + * 3.2 - TLS 1.1 + * 3.3 - TLS 1.2 + * + * The version sent in the Client-Hello is the latest version supported by + * the client. NSS may send version 3.x in an SSLv2 header for + * maximum compatibility. + */ + bool isSSL2Handshake = buf[2] == 1 && // MSG-CLIENT-HELLO + ((buf[3] == 3 && buf[4] <= 3) || // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3) + (buf[3] == 2 && buf[4] == 0)); // SSL 2 + + /* + * SSLv3/TLS Client Hello format + * RFC 2246 + * + * Byte 0: ContentType (handshake - 22) + * Bytes 1-2: ProtocolVersion {major, minor} + * + * Allowed versions: + * 3.0 - SSLv3 + * 3.1 - TLS 1.0 + * 3.2 - TLS 1.1 + * 3.3 - TLS 1.2 + */ + bool isSSL3Handshake = buf[0] == 22 && // handshake + (buf[1] == 3 && buf[2] <= 3); // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3) + + return isSSL2Handshake || isSSL3Handshake; } -std::string SslSocket::getPeername() const +Socket* SslMuxSocket::accept() const { - return getName(impl->fd, false); + int afd = ::accept(impl->fd, 0, 0); + if (afd >= 0) { + QPID_LOG(trace, "Accepting connection with optional SSL wrapper."); + if (isSslStream(afd)) { + QPID_LOG(trace, "Accepted SSL connection."); + return new SslSocket(new IOHandlePrivate(afd), prototype); + } else { + QPID_LOG(trace, "Accepted Plaintext connection."); + return new Socket(new IOHandlePrivate(afd)); + } + } else if (errno == EAGAIN) { + return 0; + } else { + throw QPID_POSIX_ERROR(errno); + } } -std::string SslSocket::getPeerAddress() const +int SslSocket::read(void *buf, size_t count) const { - if (!connectname.empty()) - return connectname; - return getName(impl->fd, false, true); + return PR_Read(socket, buf, count); } -std::string SslSocket::getLocalAddress() const +int SslSocket::write(const void *buf, size_t count) const { - return getName(impl->fd, true, true); + return PR_Write(socket, buf, count); } uint16_t SslSocket::getLocalPort() const @@ -290,17 +336,6 @@ uint16_t SslSocket::getRemotePort() const return atoi(getService(impl->fd, true).c_str()); } -int SslSocket::getError() const -{ - int result; - socklen_t rSize = sizeof (result); - - if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) - throw QPID_POSIX_ERROR(errno); - - return result; -} - void SslSocket::setTcpNoDelay(bool nodelay) const { if (nodelay) { diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h index 993859495b..eabadcbe23 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/IOHandle.h" +#include "qpid/sys/Socket.h" #include <nspr.h> #include <string> @@ -36,7 +37,7 @@ class Duration; namespace ssl { -class SslSocket : public qpid::sys::IOHandle +class SslSocket : public qpid::sys::Socket { public: /** Create a socket wrapper for descriptor. */ @@ -75,45 +76,13 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; - /** Returns the "socket name" ie the address bound to - * the near end of the socket - */ - std::string getSockname() const; - - /** Returns the "peer name" ie the address bound to - * the remote end of the socket - */ - std::string getPeername() const; - - /** - * Returns an address (host and port) for the remote end of the - * socket - */ - std::string getPeerAddress() const; - /** - * Returns an address (host and port) for the local end of the - * socket - */ - std::string getLocalAddress() const; - - /** - * Returns the full address of the connection: local and remote host and port. - */ - std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); } - uint16_t getLocalPort() const; uint16_t getRemotePort() const; - /** - * Returns the error code stored in the socket. This may be used - * to determine the result of a non-blocking connect. - */ - int getError() const; - int getKeyLen() const; std::string getClientAuthId() const; -private: +protected: mutable std::string connectname; mutable PRFileDesc* socket; std::string certname; @@ -126,6 +95,13 @@ private: mutable PRFileDesc* prototype; SslSocket(IOHandlePrivate* ioph, PRFileDesc* model); + friend class SslMuxSocket; +}; + +class SslMuxSocket : public SslSocket +{ +public: + Socket* accept() const; }; }}} diff --git a/qpid/cpp/src/qpid/sys/unordered_map.h b/qpid/cpp/src/qpid/sys/unordered_map.h new file mode 100644 index 0000000000..5f7f9567c5 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/unordered_map.h @@ -0,0 +1,35 @@ +#ifndef _sys_unordered_map_h +#define _sys_unordered_map_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// unordered_map include path is platform specific + +#ifdef _MSC_VER +# include <unordered_map> +#else +# include <tr1/unordered_map> +#endif /* _MSC_VER */ +namespace qpid { +namespace sys { + using std::tr1::unordered_map; +}} + + +#endif /* _sys_unordered_map_h */ diff --git a/qpid/cpp/src/qpid/types/Uuid.cpp b/qpid/cpp/src/qpid/types/Uuid.cpp index 9face4e5d2..9862fa8946 100644 --- a/qpid/cpp/src/qpid/types/Uuid.cpp +++ b/qpid/cpp/src/qpid/types/Uuid.cpp @@ -20,6 +20,7 @@ */ #include "qpid/types/Uuid.h" #include "qpid/sys/uuid.h" +#include "qpid/sys/IntegerTypes.h" #include <sstream> #include <iostream> #include <string.h> @@ -71,7 +72,8 @@ void Uuid::clear() // Force int 0/!0 to false/true; avoids compile warnings. bool Uuid::isNull() const { - return !!uuid_is_null(bytes); + // This const cast is for Solaris which has non const arguments + return !!uuid_is_null(const_cast<uint8_t*>(bytes)); } Uuid::operator bool() const { return !isNull(); } @@ -86,7 +88,8 @@ const unsigned char* Uuid::data() const bool operator==(const Uuid& a, const Uuid& b) { - return uuid_compare(a.bytes, b.bytes) == 0; + // This const cast is for Solaris which has non const arguments + return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) == 0; } bool operator!=(const Uuid& a, const Uuid& b) @@ -96,22 +99,26 @@ bool operator!=(const Uuid& a, const Uuid& b) bool operator<(const Uuid& a, const Uuid& b) { - return uuid_compare(a.bytes, b.bytes) < 0; + // This const cast is for Solaris which has non const arguments + return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) < 0; } bool operator>(const Uuid& a, const Uuid& b) { - return uuid_compare(a.bytes, b.bytes) > 0; + // This const cast is for Solaris which has non const arguments + return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) > 0; } bool operator<=(const Uuid& a, const Uuid& b) { - return uuid_compare(a.bytes, b.bytes) <= 0; + // This const cast is for Solaris which has non const arguments + return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) <= 0; } bool operator>=(const Uuid& a, const Uuid& b) { - return uuid_compare(a.bytes, b.bytes) >= 0; + // This const cast is for Solaris which has non const arguments + return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) >= 0; } ostream& operator<<(ostream& out, Uuid uuid) diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index a7c1dbe8a6..a0e329ca9d 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -31,7 +31,8 @@ using namespace std; auto_ptr<QpiddOptions> options; -int main(int argc, char* argv[]) +// Broker real entry; various system-invoked entrypoints call here. +int run_broker(int argc, char *argv[], bool hidden) { try { @@ -43,6 +44,8 @@ int main(int argc, char* argv[]) // module-supplied options. try { bootOptions.parse (argc, argv, bootOptions.common.config, true); + if (hidden) + bootOptions.log.sinkOptions->detached(); qpid::log::Logger::instance().configure(bootOptions.log); } catch (const std::exception& e) { // Couldn't configure logging so write the message direct to stderr. diff --git a/qpid/cpp/src/qpidd.h b/qpid/cpp/src/qpidd.h index c702270e80..a3150a2737 100644 --- a/qpid/cpp/src/qpidd.h +++ b/qpid/cpp/src/qpidd.h @@ -67,4 +67,7 @@ public: int execute (QpiddOptions *options); }; +// Broker real entry; various system-invoked entrypoints call here. +int run_broker(int argc, char *argv[], bool hidden = false); + #endif /*!QPID_H*/ diff --git a/qpid/cpp/src/tests/BrokerOptions.cpp b/qpid/cpp/src/tests/BrokerOptions.cpp new file mode 100644 index 0000000000..b36d96916a --- /dev/null +++ b/qpid/cpp/src/tests/BrokerOptions.cpp @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Unit tests for various broker configuration options **/ + +#include "unit_test.h" +#include "test_tools.h" +#include "MessagingFixture.h" + +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite) + +using namespace qpid::broker; +using namespace qpid::messaging; +using namespace qpid::types; +using namespace qpid; + +QPID_AUTO_TEST_CASE(testDisabledTimestamp) +{ + // by default, there should be no timestamp added by the broker + MessagingFixture fix; + + Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}"); + messaging::Message msg("hi"); + sender.send(msg); + + Receiver receiver = fix.session.createReceiver("test-q"); + messaging::Message in; + BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE)); + Variant::Map props = in.getProperties(); + BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end()); +} + +QPID_AUTO_TEST_CASE(testEnabledTimestamp) +{ + // when enabled, the 0.10 timestamp is added by the broker + Broker::Options opts; + opts.timestampRcvMsgs = true; + MessagingFixture fix(opts, true); + + Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}"); + messaging::Message msg("one"); + sender.send(msg); + + Receiver receiver = fix.session.createReceiver("test-q"); + messaging::Message in; + BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE)); + Variant::Map props = in.getProperties(); + BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end()); + BOOST_CHECK(props["x-amqp-0-10.timestamp"]); +} + +QPID_AUTO_TEST_SUITE_END() + +}} diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 78ac6db5f1..3c9ca1b70f 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -75,7 +75,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessagingThreadTests.cpp \ MessagingFixture.h \ ClientSessionTest.cpp \ - BrokerFixture.h SocketProxy.h \ + BrokerFixture.h \ exception_test.cpp \ RefCounted.cpp \ SessionState.cpp logging.cpp \ @@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerOptions.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -349,7 +350,8 @@ EXTRA_DIST += \ stop_broker.ps1 \ topictest.ps1 \ run_queue_flow_limit_tests \ - run_msg_group_tests + run_msg_group_tests \ + ipv6_test check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) @@ -362,7 +364,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) LONG_TESTS+=start_broker \ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \ - run_msg_groups_tests_soak \ + run_msg_group_tests_soak \ stop_broker \ run_long_federation_sys_tests \ run_failover_soak reliable_replication_test \ @@ -377,7 +379,7 @@ EXTRA_DIST+= \ reliable_replication_test \ federated_cluster_test_with_node_failure \ sasl_test_setup.sh \ - run_msg_groups_tests_soak + run_msg_group_tests_soak check-long: $(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND= diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 7bf061ff54..aaa2721021 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -258,26 +258,26 @@ QPID_AUTO_TEST_CASE(testBound){ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ client::QueueOptions args; - args.setPersistLastNode(); + args.setPersistLastNode(); - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); - //change mode - queue->setLastNodeFailure(); + //change mode + queue->setLastNodeFailure(); - //enqueue 1 message + //enqueue 1 message queue->deliver(msg3); - //check all have persistent ids. + //check all have persistent ids. BOOST_CHECK(msg1->isPersistent()); BOOST_CHECK(msg2->isPersistent()); BOOST_CHECK(msg3->isPersistent()); @@ -287,13 +287,13 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ QPID_AUTO_TEST_CASE(testSeek){ - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); @@ -313,13 +313,13 @@ QPID_AUTO_TEST_CASE(testSeek){ QPID_AUTO_TEST_CASE(testSearch){ - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); intrusive_ptr<Message> msg3 = create_message("e", "C"); - //enqueue 2 messages + //enqueue 2 messages queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); @@ -424,10 +424,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ client::QueueOptions args; // set queue mode - args.setOrdering(client::LVQ); + args.setOrdering(client::LVQ); - Queue::shared_ptr queue(new Queue("my-queue", true )); - queue->configure(args); + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); intrusive_ptr<Message> msg1 = create_message("e", "A"); intrusive_ptr<Message> msg2 = create_message("e", "B"); @@ -438,16 +438,16 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ //set deliever match for LVQ a,b,c,a string key; - args.getLVQKey(key); + args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"b"); - msg3->insertCustomProperty(key,"c"); - msg4->insertCustomProperty(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"b"); + msg3->insertCustomProperty(key,"c"); + msg4->insertCustomProperty(key,"a"); - //enqueue 4 message + //enqueue 4 message queue->deliver(msg1); queue->deliver(msg2); queue->deliver(msg3); @@ -467,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ intrusive_ptr<Message> msg5 = create_message("e", "A"); intrusive_ptr<Message> msg6 = create_message("e", "B"); intrusive_ptr<Message> msg7 = create_message("e", "C"); - msg5->insertCustomProperty(key,"a"); - msg6->insertCustomProperty(key,"b"); - msg7->insertCustomProperty(key,"c"); + msg5->insertCustomProperty(key,"a"); + msg6->insertCustomProperty(key,"b"); + msg7->insertCustomProperty(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); @@ -652,7 +652,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ msg1->insertCustomProperty(key,"a"); msg2->insertCustomProperty(key,"a"); - // 3 + // 3 queue1->deliver(msg1); // 4 queue1->setLastNodeFailure(); @@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt { for (uint i = 0; i < count; i++) { intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); - m->setTimestamp(new broker::ExpiryPolicy); + m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 5e9a150d8f..65d5242e51 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -1030,6 +1030,64 @@ class ACLTests(TestBase010): if (403 == e.args[0].error_code): self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1"); + #===================================== + # ACL broker configuration tests + #===================================== + + def test_broker_timestamp_config(self): + """ + Test ACL control of the broker timestamp configuration + """ + aclf = self.get_acl_file() + # enable lots of stuff to allow QMF to work + aclf.write('acl allow all create exchange\n') + aclf.write('acl allow all access exchange\n') + aclf.write('acl allow all bind exchange\n') + aclf.write('acl allow all publish exchange\n') + aclf.write('acl allow all create queue\n') + aclf.write('acl allow all access queue\n') + aclf.write('acl allow all delete queue\n') + aclf.write('acl allow all consume queue\n') + aclf.write('acl allow all access method\n') + # this should let bob access the timestamp configuration + aclf.write('acl allow bob@QPID access broker\n') + aclf.write('acl allow admin@QPID all all\n') + aclf.write('acl deny all all') + aclf.close() + + result = self.reload_acl() + if (result.text.find("format error",0,len(result.text)) != -1): + self.fail(result) + + ts = None + bob = BrokerAdmin(self.config.broker, "bob", "bob") + ts = bob.get_timestamp_cfg() #should work + bob.set_timestamp_cfg(ts); #should work + + obo = BrokerAdmin(self.config.broker, "obo", "obo") + try: + ts = obo.get_timestamp_cfg() #should fail + failed = False + except Exception, e: + failed = True + self.assertEqual(7,e.args[0]["error_code"]) + assert e.args[0]["error_text"].find("unauthorized-access") == 0 + assert(failed) + + try: + obo.set_timestamp_cfg(ts) #should fail + failed = False + except Exception, e: + failed = True + self.assertEqual(7,e.args[0]["error_code"]) + assert e.args[0]["error_text"].find("unauthorized-access") == 0 + assert(failed) + + admin = BrokerAdmin(self.config.broker, "admin", "admin") + ts = admin.get_timestamp_cfg() #should pass + admin.set_timestamp_cfg(ts) #should pass + + class BrokerAdmin: def __init__(self, broker, username=None, password=None): self.connection = qpid.messaging.Connection(broker) @@ -1075,3 +1133,9 @@ class BrokerAdmin: def delete_queue(self, name): self.invoke("delete", {"type": "queue", "name":name}) + + def get_timestamp_cfg(self): + return self.invoke("getTimestampConfig", {}) + + def set_timestamp_cfg(self, receive): + return self.invoke("getTimestampConfig", {"receive":receive}) diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index d217f9fbde..0e80e06d34 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -115,63 +115,22 @@ class ShortTests(BrokerTest): acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") aclf.write(""" -acl deny zag@QPID create queue -acl allow all all +acl allow zig@QPID all all +acl deny all all """) aclf.close() - cluster = self.cluster(2, args=["--auth", "yes", + cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, "--load-module", os.getenv("ACL_LIB"), "--acl-file", acl]) # Valid user/password, ensure queue is created. c = cluster[0].connect(username="zig", password="zig") - c.session().sender("ziggy;{create:always}") - c.close() - c = cluster[1].connect(username="zig", password="zig") - c.session().receiver("ziggy;{assert:always}") + c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}") c.close() - for b in cluster: b.ready() # Make sure all brokers still running. - - # Valid user, bad password - try: - cluster[0].connect(username="zig", password="foo").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Bad user ID - try: - cluster[0].connect(username="foo", password="bar").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Action disallowed by ACL - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{create:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.UnauthorizedAccess: pass - # make sure the queue was not created at the other node. - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{assert:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.NotFound: pass - - def test_sasl_join(self): - """Verify SASL authentication between brokers when joining a cluster.""" - sasl_config=os.path.join(self.rootdir, "sasl_config") + cluster.start() # Start second node. - # Valid user/password, ensure queue is created. - c = cluster[0].connect(username="zig", password="zig") - c.session().sender("ziggy;{create:always}") - c.close() + # Check queue is created on second node. c = cluster[1].connect(username="zig", password="zig") c.session().receiver("ziggy;{assert:always}") c.close() @@ -200,49 +159,7 @@ acl allow all all self.fail("Expected exception") except messaging.exceptions.UnauthorizedAccess: pass # make sure the queue was not created at the other node. - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{assert:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.NotFound: pass - - def test_sasl_join(self): - """Verify SASL authentication between brokers when joining a cluster.""" - # Valid user/password, ensure queue is created. - c = cluster[0].connect(username="zig", password="zig") - c.session().sender("ziggy;{create:always}") - c.close() c = cluster[1].connect(username="zig", password="zig") - c.session().receiver("ziggy;{assert:always}") - c.close() - for b in cluster: b.ready() # Make sure all brokers still running. - - # Valid user, bad password - try: - cluster[0].connect(username="zig", password="foo").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Bad user ID - try: - cluster[0].connect(username="foo", password="bar").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Action disallowed by ACL - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{create:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.UnauthorizedAccess: pass - # make sure the queue was not created at the other node. - c = cluster[0].connect(username="zag", password="zag") try: s = c.session() s.sender("zaggy;{assert:always}") diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp index 3aff742c62..664f0cf877 100644 --- a/qpid/cpp/src/tests/qpid-perftest.cpp +++ b/qpid/cpp/src/tests/qpid-perftest.cpp @@ -173,7 +173,7 @@ struct Opts : public TestOptions { if (count % subs) { count += subs - (count % subs); cout << "WARNING: Adjusted --count to " << count - << " the nearest multiple of --nsubs" << endl; + << " the next multiple of --nsubs" << endl; } totalPubs = pubs*qt; totalSubs = subs*qt; @@ -413,7 +413,7 @@ struct Controller : public Client { AbsTime start=now(); send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers if (j) { - send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration + send(opts.totalSubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration } Stats pubRates; @@ -546,9 +546,9 @@ struct PublishThread : public Client { if (opts.confirm) session.sync(); AbsTime end=now(); double time=secs(start,end); - if (time <= 0.0) { - throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); - } + if (time <= 0.0) { + throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); + } // Send result to controller. Message report(lexical_cast<string>(opts.count/time), fqn("pub_done")); @@ -644,7 +644,9 @@ struct SubscribeThread : public Client { // // For now verify order only for a single publisher. size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; - size_t n = *reinterpret_cast<const size_t*>(msg.getData().data() + offset); + size_t n; + memcpy (&n, reinterpret_cast<const char*>(msg.getData().data() + offset), + sizeof(n)); if (opts.pubs == 1) { if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); else verify(n>=expect, ">=", expect, n); diff --git a/qpid/cpp/src/tests/sasl.mk b/qpid/cpp/src/tests/sasl.mk index 20eaa7c7a5..69b24c3f8a 100644 --- a/qpid/cpp/src/tests/sasl.mk +++ b/qpid/cpp/src/tests/sasl.mk @@ -30,7 +30,7 @@ check_PROGRAMS+=sasl_version sasl_version_SOURCES=sasl_version.cpp sasl_version_LDADD=$(lib_client) -TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster +TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster sasl_no_dir LONG_TESTS += run_cluster_authentication_soak EXTRA_DIST += run_cluster_authentication_test \ sasl_fed \ @@ -43,7 +43,8 @@ EXTRA_DIST += run_cluster_authentication_test \ sasl_fed_ex_dynamic_cluster \ sasl_fed_ex_link_cluster \ sasl_fed_ex_queue_cluster \ - sasl_fed_ex_route_cluster + sasl_fed_ex_route_cluster \ + sasl_no_dir endif # HAVE_SASL diff --git a/qpid/cpp/src/tests/sasl_no_dir b/qpid/cpp/src/tests/sasl_no_dir new file mode 100755 index 0000000000..15a36014bb --- /dev/null +++ b/qpid/cpp/src/tests/sasl_no_dir @@ -0,0 +1,218 @@ +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +source ./test_env.sh + +script_name=`basename $0` + +# This minimum value corresponds to sasl version 2.1.22 +minimum_sasl_version=131350 + +sasl_version=`$QPID_TEST_EXEC_DIR/sasl_version` + +# This test is necessary because this sasl version is the first one that permits +# redirection of the sasl config file path. +if [ "$sasl_version" -lt "$minimum_sasl_version" ]; then + echo "sasl_fed: must have sasl version 2.1.22 or greater. ( Integer value: $minimum_sasl_version ) Version is: $sasl_version" + exit 0 +fi + + +sasl_config_dir=$builddir/sasl_config + + +# Debugging print. -------------------------- +debug= +function print { + if [ "$debug" ]; then + echo "${script_name}: $1" + fi +} + + +my_random_number=$RANDOM +tmp_root=/tmp/sasl_fed_$my_random_number +mkdir -p $tmp_root + + +LOG_FILE=$tmp_root/qpidd.log + +# If you want to see this test fail, just comment out this 'mv' command. +print "Moving sasl configuration dir." +mv ${sasl_config_dir} ${sasl_config_dir}- + + +#-------------------------------------------------- +print " Starting broker" +#-------------------------------------------------- +$QPIDD_EXEC \ + -p 0 \ + --no-data-dir \ + --auth=yes \ + --mgmt-enable=yes \ + --log-enable info+ \ + --log-source yes \ + --log-to-file ${LOG_FILE} \ + --sasl-config=$sasl_config_dir \ + -d 2> /dev/null 1> $tmp_root/broker_port + + + +# If it works right, the output will look something like this: ( two lines long ) +# Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112) +# 2011-10-13 14:07:00 critical qpidd.cpp:83: Unexpected error: Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112) + +result=`cat ${LOG_FILE} | grep "sasl_set_path failed: no such directory" | wc -l ` + +#-------------------------------------------------- +print "Restore the Sasl config dir to its original place." +#-------------------------------------------------- +mv ${sasl_config_dir}- ${sasl_config_dir} + +if [ "2" -eq ${result} ]; then + print "result: success" + rm -rf $tmp_root + exit 0 +fi + + +# If this test fails, the broker is still alive. +# Kill it. +broker_port=`cat $tmp_root/broker_port` +#-------------------------------------------------- +print "Asking broker to quit." +#-------------------------------------------------- +$QPIDD_EXEC --port $broker_port --quit + +rm -rf $tmp_root + +print "result: fail" +exit 1 + +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +source ./test_env.sh + +script_name=`basename $0` + +# This minimum value corresponds to sasl version 2.1.22 +minimum_sasl_version=131350 + +sasl_version=`$QPID_TEST_EXEC_DIR/sasl_version` + +# This test is necessary because this sasl version is the first one that permits +# redirection of the sasl config file path. +if [ "$sasl_version" -lt "$minimum_sasl_version" ]; then + echo "sasl_fed: must have sasl version 2.1.22 or greater. ( Integer value: $minimum_sasl_version ) Version is: $sasl_version" + exit 0 +fi + + +sasl_config_dir=$builddir/sasl_config + + +# Debugging print. -------------------------- +debug= +function print { + if [ "$debug" ]; then + echo "${script_name}: $1" + fi +} + + +my_random_number=$RANDOM +tmp_root=/tmp/sasl_fed_$my_random_number +mkdir -p $tmp_root + + +LOG_FILE=$tmp_root/qpidd.log + +# If you want to see this test fail, just comment out this 'mv' command. +print "Moving sasl configuration dir." +mv ${sasl_config_dir} ${sasl_config_dir}- + + +#-------------------------------------------------- +print " Starting broker" +#-------------------------------------------------- +$QPIDD_EXEC \ + -p 0 \ + --no-data-dir \ + --auth=yes \ + --mgmt-enable=yes \ + --log-enable info+ \ + --log-source yes \ + --log-to-file ${LOG_FILE} \ + --sasl-config=$sasl_config_dir \ + -d 2> /dev/null 1> $tmp_root/broker_port + + + +# If it works right, the output will look something like this: ( two lines long ) +# Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112) +# 2011-10-13 14:07:00 critical qpidd.cpp:83: Unexpected error: Daemon startup failed: SASL: sasl_set_path failed: no such directory: /home/mick/trunk/qpid/cpp/src/tests/sasl_config (qpid/broker/SaslAuthenticator.cpp:112) + +result=`cat ${LOG_FILE} | grep "sasl_set_path failed: no such directory" | wc -l ` + +#-------------------------------------------------- +print "Restore the Sasl config dir to its original place." +#-------------------------------------------------- +mv ${sasl_config_dir}- ${sasl_config_dir} + +if [ "2" -eq ${result} ]; then + print "result: success" + rm -rf $tmp_root + exit 0 +fi + + +# If this test fails, the broker is still alive. +# Kill it. +broker_port=`cat $tmp_root/broker_port` +#-------------------------------------------------- +print "Asking broker to quit." +#-------------------------------------------------- +$QPIDD_EXEC --port $broker_port --quit + +rm -rf $tmp_root + +print "result: fail" +exit 1 + diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test index cbf75eb237..6c056f4288 100755 --- a/qpid/cpp/src/tests/ssl_test +++ b/qpid/cpp/src/tests/ssl_test @@ -47,9 +47,13 @@ delete_certs() { fi } -COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption" +COMMON_OPTS="--daemon --no-data-dir --no-module-dir --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME" start_broker() { # $1 = extra opts - ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS $1; + ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --auth no $1; +} + +start_authenticating_broker() { + ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes; } stop_brokers() { @@ -64,6 +68,13 @@ cleanup() { delete_certs } +pick_port() { + # We need a fixed port to set --cluster-url. Use qpidd to pick a free port. + PICK=`../qpidd --no-module-dir -dp0` + ../qpidd --no-module-dir -qp $PICK + echo $PICK +} + CERTUTIL=$(type -p certutil) if [[ !(-x $CERTUTIL) ]] ; then echo "No certutil, skipping ssl test"; @@ -93,7 +104,7 @@ test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; } #### Client Authentication tests -PORT2=`start_broker --ssl-require-client-authentication` || error "Could not start broker" +PORT2=`start_authenticating_broker` || error "Could not start broker" echo "Running SSL client authentication test on port $PORT2" URL=amqp:ssl:$TEST_HOSTNAME:$PORT2 @@ -109,19 +120,22 @@ test "$MSG3" = "" || { echo "receive succeeded without valid ssl cert '$MSG3' != stop_brokers +#Test multiplexed connection where SSL and plain TCP are served by the same port +PORT=`pick_port`; ../qpidd --port $PORT --ssl-port $PORT $COMMON_OPTS --transport ssl --auth no +echo "Running multiplexed SSL/TCP test on $PORT" + +./qpid-perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary || { echo "SSL on multiplexed connection failed!"; exit 1; } +./qpid-perftest --count ${COUNT} --port ${PORT} -P tcp -b $TEST_HOSTNAME --summary || { echo "Plain TCP on multiplexed connection failed!"; exit 1; } + +stop_brokers + test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported. ## Test failover in a cluster using SSL only . $srcdir/ais_check # Will exit if clustering not enabled. -pick_port() { - # We need a fixed port to set --cluster-url. Use qpidd to pick a free port. - PICK=`../qpidd --no-module-dir -dp0` - ../qpidd --no-module-dir -qp $PICK - echo $PICK -} ssl_cluster_broker() { # $1 = port - ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null + ../qpidd $COMMON_OPTS --require-encryption --auth no --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null # Wait for broker to be ready qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } echo "Running SSL cluster broker on port $1" diff --git a/qpid/cpp/src/windows/QpiddBroker.cpp b/qpid/cpp/src/windows/QpiddBroker.cpp index 50bb45979c..42ba97bdb1 100644 --- a/qpid/cpp/src/windows/QpiddBroker.cpp +++ b/qpid/cpp/src/windows/QpiddBroker.cpp @@ -19,17 +19,9 @@ * */ -#ifdef HAVE_CONFIG_H -# include "config.h" -#else -// These need to be made something sensible, like reading a value from -// the registry. But for now, get things going with a local definition. -namespace { -const char *QPIDD_CONF_FILE = "qpid_broker.conf"; -const char *QPIDD_MODULE_DIR = "."; -} -#endif +#include "config.h" #include "qpidd.h" +#include "SCM.h" #include "qpid/Exception.h" #include "qpid/Options.h" #include "qpid/Plugin.h" @@ -205,8 +197,56 @@ struct BrokerInfo { DWORD pid; }; +// Service-related items. Only involved when running the broker as a Windows +// service. + +const std::string svcName = "qpidd"; +SERVICE_STATUS svcStatus; +SERVICE_STATUS_HANDLE svcStatusHandle = 0; + +// This function is only called when the broker is run as a Windows +// service. It receives control requests from Windows. +VOID WINAPI SvcCtrlHandler(DWORD control) +{ + switch(control) { + case SERVICE_CONTROL_STOP: + svcStatus.dwCurrentState = SERVICE_STOP_PENDING; + svcStatus.dwControlsAccepted = 0; + svcStatus.dwCheckPoint = 1; + svcStatus.dwWaitHint = 5000; // 5 secs. + ::SetServiceStatus(svcStatusHandle, &svcStatus); + CtrlHandler(CTRL_C_EVENT); + break; + + case SERVICE_CONTROL_INTERROGATE: + break; + + default: + break; + } +} + +VOID WINAPI ServiceMain(DWORD argc, LPTSTR *argv) +{ + ::memset(&svcStatus, 0, sizeof(svcStatus)); + svcStatusHandle = ::RegisterServiceCtrlHandler(svcName.c_str(), + SvcCtrlHandler); + svcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + svcStatus.dwCheckPoint = 1; + svcStatus.dwWaitHint = 10000; // 10 secs. + svcStatus.dwCurrentState = SERVICE_START_PENDING; + ::SetServiceStatus(svcStatusHandle, &svcStatus); + // QpiddBroker class resets state to running. + svcStatus.dwWin32ExitCode = run_broker(argc, argv, true); + svcStatus.dwCurrentState = SERVICE_STOPPED; + svcStatus.dwCheckPoint = 0; + svcStatus.dwWaitHint = 0; + ::SetServiceStatus(svcStatusHandle, &svcStatus); } +} // namespace + + struct ProcessControlOptions : public qpid::Options { bool quit; bool check; @@ -225,9 +265,49 @@ struct ProcessControlOptions : public qpid::Options { } }; +struct ServiceOptions : public qpid::Options { + bool install; + bool start; + bool stop; + bool uninstall; + bool daemon; + std::string startType; + std::string startArgs; + std::string account; + std::string password; + std::string depends; + + ServiceOptions() + : qpid::Options("Service options"), + install(false), + start(false), + stop(false), + uninstall(false), + daemon(false), + startType("demand"), + startArgs(""), + account("NT AUTHORITY\\LocalService"), + password(""), + depends("") + { + addOptions() + ("install", qpid::optValue(install), "Install as service") + ("start-type", qpid::optValue(startType, "auto|demand|disabled"), "Service start type\nApplied at install time only.") + ("arguments", qpid::optValue(startArgs, "COMMAND LINE ARGS"), "Arguments to pass when service auto-starts") + ("account", qpid::optValue(account, "(LocalService)"), "Account to run as, default is LocalService\nApplied at install time only.") + ("password", qpid::optValue(password, "PASSWORD"), "Account password, if needed\nApplied at install time only.") + ("depends", qpid::optValue(depends, "(comma delimited list)"), "Names of services that must start before this service\nApplied at install time only.") + ("start", qpid::optValue(start), "Start the service.") + ("stop", qpid::optValue(stop), "Stop the service.") + ("uninstall", qpid::optValue(uninstall), "Uninstall the service."); + } +}; + struct QpiddWindowsOptions : public QpiddOptionsPrivate { ProcessControlOptions control; + ServiceOptions service; QpiddWindowsOptions(QpiddOptions *parent) : QpiddOptionsPrivate(parent) { + parent->add(service); parent->add(control); } }; @@ -253,12 +333,63 @@ void QpiddOptions::usage() const { } int QpiddBroker::execute (QpiddOptions *options) { + + // If running as a service, bump the status checkpoint to let SCM know + // we're still making progress. + if (svcStatusHandle != 0) { + svcStatus.dwCheckPoint++; + ::SetServiceStatus(svcStatusHandle, &svcStatus); + } + // Options that affect a running daemon. QpiddWindowsOptions *myOptions = - reinterpret_cast<QpiddWindowsOptions *>(options->platform.get()); + reinterpret_cast<QpiddWindowsOptions *>(options->platform.get()); if (myOptions == 0) throw qpid::Exception("Internal error obtaining platform options"); + if (myOptions->service.install) { + // Handle start type + DWORD startType; + if (myOptions->service.startType.compare("demand") == 0) + startType = SERVICE_DEMAND_START; + else if (myOptions->service.startType.compare("auto") == 0) + startType = SERVICE_AUTO_START; + else if (myOptions->service.startType.compare("disabled") == 0) + startType = SERVICE_DISABLED; + else if (!myOptions->service.startType.empty()) + throw qpid::Exception("Invalid service start type: " + + myOptions->service.startType); + + // Install service and exit + qpid::windows::SCM manager; + manager.install(svcName, + "Apache Qpid Message Broker", + myOptions->service.startArgs, + startType, + myOptions->service.account, + myOptions->service.password, + myOptions->service.depends); + return 0; + } + + if (myOptions->service.start) { + qpid::windows::SCM manager; + manager.start(svcName); + return 0; + } + + if (myOptions->service.stop) { + qpid::windows::SCM manager; + manager.stop(svcName); + return 0; + } + + if (myOptions->service.uninstall) { + qpid::windows::SCM manager; + manager.uninstall(svcName); + return 0; + } + if (myOptions->control.check || myOptions->control.quit) { // Relies on port number being set via --port or QPID_PORT env variable. NamedSharedMemory<BrokerInfo> info(brokerInfoName(options->broker.port)); @@ -301,10 +432,41 @@ int QpiddBroker::execute (QpiddOptions *options) { ::SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE); brokerPtr->accept(); std::cout << options->broker.port << std::endl; + + // If running as a service, tell SCM we're up. There's still a chance + // that store recovery will drag out the time before the broker actually + // responds to requests, but integrating that mechanism with the SCM + // updating is probably more work than it's worth. + if (svcStatusHandle != 0) { + svcStatus.dwCheckPoint = 0; + svcStatus.dwWaitHint = 0; + svcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP; + svcStatus.dwCurrentState = SERVICE_RUNNING; + ::SetServiceStatus(svcStatusHandle, &svcStatus); + } + brokerPtr->run(); waitShut.signal(); // In case we shut down some other way waitThr.join(); + return 0; +} + - // CloseHandle(h); +int main(int argc, char* argv[]) +{ + // If started as a service, notify the SCM we're up. Else just run. + // If as a service, StartServiceControlDispatcher doesn't return until + // the service is stopped. + SERVICE_TABLE_ENTRY dispatchTable[] = + { + { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain }, + { NULL, NULL } + }; + if (!StartServiceCtrlDispatcher(dispatchTable)) { + DWORD err = ::GetLastError(); + if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console + return run_broker(argc, argv); + throw QPID_WINDOWS_ERROR(err); + } return 0; } diff --git a/qpid/cpp/src/windows/SCM.cpp b/qpid/cpp/src/windows/SCM.cpp new file mode 100644 index 0000000000..232bb04c17 --- /dev/null +++ b/qpid/cpp/src/windows/SCM.cpp @@ -0,0 +1,332 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/sys/windows/check.h"
+#include "SCM.h"
+
+#pragma comment(lib, "advapi32.lib")
+
+namespace {
+
+// Container that will close a SC_HANDLE upon destruction.
+class AutoServiceHandle {
+public:
+ AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}
+ ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }
+ void release() { h = NULL; }
+ void reset(SC_HANDLE newHandle)
+ {
+ if (h != NULL)
+ ::CloseServiceHandle(h);
+ h = newHandle;
+ }
+ operator SC_HANDLE() const { return h; }
+
+private:
+ SC_HANDLE h;
+};
+
+}
+
+namespace qpid {
+namespace windows {
+
+SCM::SCM() : scmHandle(NULL)
+{
+}
+
+SCM::~SCM()
+{
+ if (NULL != scmHandle)
+ ::CloseServiceHandle(scmHandle);
+}
+
+/**
+ * Install this executable as a service
+ */
+void SCM::install(const string& serviceName,
+ const string& serviceDesc,
+ const string& args,
+ DWORD startType,
+ const string& account,
+ const string& password,
+ const string& depends)
+{
+ // Handle dependent service name list; Windows wants a set of nul-separated
+ // names ending with a double nul.
+ string depends2 = depends;
+ if (!depends2.empty()) {
+ // CDL to null delimiter w/ trailing double null
+ size_t p = 0;
+ while ((p = depends2.find_first_of( ',', p)) != string::npos)
+ depends2.replace(p, 1, 1, '\0');
+ depends2.push_back('\0');
+ depends2.push_back('\0');
+ }
+
+#if 0
+ // I'm nervous about adding a user/password check here. Is this a
+ // potential attack vector, letting users check passwords without
+ // control? -Steve Huston, Feb 24, 2011
+
+ // Validate account, password
+ HANDLE hToken = NULL;
+ bool logStatus = false;
+ if (!account.empty() && !password.empty() &&
+ !(logStatus = ::LogonUserA(account.c_str(),
+ "",
+ password.c_str(),
+ LOGON32_LOGON_NETWORK,
+ LOGON32_PROVIDER_DEFAULT,
+ &hToken ) != 0))
+ std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;
+ if (logStatus)
+ ::CloseHandle(hToken);
+#endif
+
+ // Get fully qualified .exe name
+ char myPath[MAX_PATH];
+ DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);
+ QPID_WINDOWS_CHECK_NOT(myPathLength, 0);
+ string imagePath(myPath, myPathLength);
+ if (!args.empty())
+ imagePath += " " + args;
+
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+
+ // Create the service
+ SC_HANDLE svcHandle;
+ svcHandle = ::CreateService(scmHandle, // SCM database
+ serviceName.c_str(), // name of service
+ serviceDesc.c_str(), // name to display
+ SERVICE_ALL_ACCESS, // desired access
+ SERVICE_WIN32_OWN_PROCESS, // service type
+ startType, // start type
+ SERVICE_ERROR_NORMAL, // error cntrl type
+ imagePath.c_str(), // path to service's binary w/ optional arguments
+ NULL, // no load ordering group
+ NULL, // no tag identifier
+ depends2.empty() ? NULL : depends2.c_str(),
+ account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem
+ password.empty() ? NULL : password.c_str()); // password, or NULL for none
+ QPID_WINDOWS_CHECK_NULL(svcHandle);
+ ::CloseServiceHandle(svcHandle);
+ QPID_LOG(info, "Service installed successfully");
+}
+
+/**
+ *
+ */
+void SCM::uninstall(const string& serviceName)
+{
+ // Ensure there's a handle to the SCM database.
+ openSvcManager();
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ DELETE));
+ QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);
+ QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);
+ QPID_LOG(info, "Service deleted successfully.");
+}
+
+/**
+ * Attempt to start the service.
+ */
+void SCM::start(const string& serviceName)
+{
+ // Ensure we have a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_ALL_ACCESS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Check the status in case the service is not stopped.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOP_PENDING)
+ throw qpid::Exception("Timed out waiting for running service to stop.");
+
+ // Attempt to start the service.
+ QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);
+
+ QPID_LOG(info, "Service start pending...");
+
+ // Check the status until the service is no longer start pending.
+ state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);
+ // Determine whether the service is running.
+ if (state == SERVICE_RUNNING) {
+ QPID_LOG(info, "Service started successfully");
+ }
+ else {
+ throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));
+ }
+}
+
+/**
+ *
+ */
+void SCM::stop(const string& serviceName)
+{
+ // Ensure a handle to the SCM database.
+ openSvcManager();
+
+ // Get a handle to the service.
+ AutoServiceHandle svc(::OpenService(scmHandle,
+ serviceName.c_str(),
+ SERVICE_STOP | SERVICE_QUERY_STATUS |
+ SERVICE_ENUMERATE_DEPENDENTS));
+ QPID_WINDOWS_CHECK_NULL(svc);
+
+ // Make sure the service is not already stopped; if it's stop-pending,
+ // wait for it to finalize.
+ DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED) {
+ QPID_LOG(info, "Service is already stopped");
+ return;
+ }
+
+ // If the service is running, dependencies must be stopped first.
+ std::auto_ptr<ENUM_SERVICE_STATUS> deps;
+ DWORD numDeps = getDependentServices(svc, deps);
+ for (DWORD i = 0; i < numDeps; i++)
+ stop(deps.get()[i].lpServiceName);
+
+ // Dependents stopped; send a stop code to the service.
+ SERVICE_STATUS_PROCESS ssp;
+ if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))
+ throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<
+ qpid::sys::strError(::GetLastError())));
+
+ // Wait for the service to stop.
+ state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+ if (state == SERVICE_STOPPED)
+ QPID_LOG(info, QPID_MSG("Service " << serviceName <<
+ " stopped successfully."));
+}
+
+/**
+ *
+ */
+void SCM::openSvcManager()
+{
+ if (NULL != scmHandle)
+ return;
+
+ scmHandle = ::OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // Rights
+ QPID_WINDOWS_CHECK_NULL(scmHandle);
+}
+
+DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)
+{
+ SERVICE_STATUS_PROCESS ssStatus;
+ DWORD bytesNeeded;
+ DWORD waitTime;
+ if (!::QueryServiceStatusEx(svc, // handle to service
+ SC_STATUS_PROCESS_INFO, // information level
+ (LPBYTE)&ssStatus, // address of structure
+ sizeof(ssStatus), // size of structure
+ &bytesNeeded)) // size needed if buffer is too small
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ // Save the tick count and initial checkpoint.
+ DWORD startTickCount = ::GetTickCount();
+ DWORD oldCheckPoint = ssStatus.dwCheckPoint;
+
+ // Wait for the service to change out of the noted state.
+ while (ssStatus.dwCurrentState == originalState) {
+ // Do not wait longer than the wait hint. A good interval is
+ // one-tenth of the wait hint but not less than 1 second
+ // and not more than 10 seconds.
+ waitTime = ssStatus.dwWaitHint / 10;
+ if (waitTime < 1000)
+ waitTime = 1000;
+ else if (waitTime > 10000)
+ waitTime = 10000;
+
+ ::Sleep(waitTime);
+
+ // Check the status until the service is no longer stop pending.
+ if (!::QueryServiceStatusEx(svc,
+ SC_STATUS_PROCESS_INFO,
+ (LPBYTE) &ssStatus,
+ sizeof(ssStatus),
+ &bytesNeeded))
+ throw QPID_WINDOWS_ERROR(::GetLastError());
+
+ if (ssStatus.dwCheckPoint > oldCheckPoint) {
+ // Continue to wait and check.
+ startTickCount = ::GetTickCount();
+ oldCheckPoint = ssStatus.dwCheckPoint;
+ } else {
+ if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)
+ break;
+ }
+ }
+ return ssStatus.dwCurrentState;
+}
+
+/**
+ * Get the services that depend on @arg svc. All dependent service info
+ * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.
+ *
+ * @retval The number of dependent services.
+ */
+DWORD SCM::getDependentServices(SC_HANDLE svc,
+ std::auto_ptr<ENUM_SERVICE_STATUS>& deps)
+{
+ DWORD bytesNeeded;
+ DWORD numEntries;
+
+ // Pass a zero-length buffer to get the required buffer size.
+ if (::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ 0,
+ 0,
+ &bytesNeeded,
+ &numEntries)) {
+ // If the Enum call succeeds, then there are no dependent
+ // services, so do nothing.
+ return 0;
+ }
+
+ if (::GetLastError() != ERROR_MORE_DATA)
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+
+ // Allocate a buffer for the dependencies.
+ deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));
+ // Enumerate the dependencies.
+ if (!::EnumDependentServices(svc,
+ SERVICE_ACTIVE,
+ deps.get(),
+ bytesNeeded,
+ &bytesNeeded,
+ &numEntries))
+ throw QPID_WINDOWS_ERROR((::GetLastError()));
+ return numEntries;
+}
+
+} } // namespace qpid::windows
diff --git a/qpid/cpp/src/windows/SCM.h b/qpid/cpp/src/windows/SCM.h new file mode 100644 index 0000000000..bdc73bc210 --- /dev/null +++ b/qpid/cpp/src/windows/SCM.h @@ -0,0 +1,109 @@ +#ifndef WINDOWS_SCM_H
+#define WINDOWS_SCM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <string>
+using std::string;
+
+#ifdef UNICODE
+#undef UNICODE
+#endif
+
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+
+namespace qpid {
+namespace windows {
+
+/**
+ * @class SCM
+ *
+ * Access the Windows Service Control Manager.
+ */
+class SCM
+{
+public:
+ SCM();
+ ~SCM();
+
+ /**
+ * Install this executable as a service
+ *
+ * @param serviceName The name of the service
+ * @param serviceDesc Description of the service's purpose
+ * @param args The argument list to pass into the service
+ * @param startType The start type: SERVICE_DEMAND_START,
+ * SERVICE_AUTO_START, SERVICE_DISABLED
+ * @param account If not empty, the account name to install this
+ * service under
+ * @param password If not empty, the account password to install this
+ * service with
+ * @param depends If not empty, a comma delimited list of services
+ * that must start before this one
+ */
+ void install(const string& serviceName,
+ const string& serviceDesc,
+ const string& args,
+ DWORD startType = SERVICE_DEMAND_START,
+ const string& account = "NT AUTHORITY\\LocalSystem",
+ const string& password = "",
+ const string& depends = "");
+
+ /**
+ * Uninstall this executable as a service
+ *
+ * @param serviceName the name of the service
+ */
+ void uninstall(const string& serviceName);
+
+ /**
+ * Start the specified service
+ *
+ * @param serviceName the name of the service
+ */
+ void start(const string& serviceName);
+
+ /**
+ * Stop the specified service
+ *
+ * @param serviceName the name of the service
+ */
+ void stop(const string &serviceName);
+
+private:
+ SC_HANDLE scmHandle;
+
+ void openSvcManager();
+ DWORD waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState);
+ DWORD getDependentServices(SC_HANDLE svc,
+ std::auto_ptr<ENUM_SERVICE_STATUS>& deps);
+
+};
+
+}} // namespace qpid::windows
+
+#endif /* #ifndef WINDOWS_SCM_H */
|