diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-12 14:52:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-12 14:52:36 +0000 |
commit | f1b26fa4137f03029c7833e2276d46496afe6d3e (patch) | |
tree | 3028988d1783252165f6f52f37d191e2a336a853 /cpp/src | |
parent | 3d2ce1b5656bbba8b23b31848616b1010f46ede9 (diff) | |
download | qpid-python-f1b26fa4137f03029c7833e2276d46496afe6d3e.tar.gz |
Further fixes to locking between queue and semantic state to avoid deadlocking.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 11 |
4 files changed, 102 insertions, 53 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 16e91fc1cf..9586f6b994 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -124,21 +124,20 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -void Queue::requestDispatch(Consumer* c, bool sync){ +void Queue::requestDispatch(Consumer* c){ if (!c || c->preAcquires()) { - if (sync) { - Mutex::ScopedLock locker(messageLock); - dispatch(); - } else { - serializer.execute(dispatchCallback); - } + serializer.execute(dispatchCallback); } else { - //note: this is always done on the callers thread, regardless - // of sync; browsers of large queues should use flow control! serviceBrowser(c); } } +void Queue::flush(DispatchCompletion& completion) +{ + DispatchFunctor f(*this, &completion); + serializer.execute(f); +} + Consumer* Queue::allocate() { RWlock::ScopedWlock locker(consumerLock); @@ -179,9 +178,18 @@ void Queue::dispatch(){ } else { break; } - } - RWlock::ScopedRlock locker(consumerLock); - for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) { + } + serviceAllBrowsers(); +} + +void Queue::serviceAllBrowsers() +{ + Consumers copy; + { + RWlock::ScopedRlock locker(consumerLock); + copy = browsers; + } + for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) { serviceBrowser(*i); } } @@ -428,3 +436,4 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() { return alternateExchange; } + diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e02444642b..6e859e67bb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -48,20 +48,32 @@ namespace qpid { using std::string; + struct DispatchCompletion + { + virtual ~DispatchCompletion() {} + virtual void completed() = 0; + }; + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue{ + class Queue : public PersistableQueue { typedef std::vector<Consumer*> Consumers; typedef std::deque<QueuedMessage> Messages; - struct DispatchFunctor { + struct DispatchFunctor + { Queue& queue; - DispatchFunctor(Queue& q) : queue(q) {} - void operator()() { queue.dispatch(); } + DispatchCompletion* sync; + DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {} + void operator()() + { + queue.dispatch(); + if (sync) sync->completed(); + } }; const string name; @@ -93,6 +105,7 @@ namespace qpid { */ void dispatch(); void cancel(Consumer* c, Consumers& set); + void serviceAllBrowsers(); void serviceBrowser(Consumer* c); Consumer* allocate(); bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); @@ -149,7 +162,8 @@ namespace qpid { * at any time, so this call schedules the despatch based on * the serilizer policy. */ - void requestDispatch(Consumer* c = 0, bool sync = false); + void requestDispatch(Consumer* c = 0); + void flush(DispatchCompletion& callback); void consume(Consumer* c, bool exclusive = false); void cancel(Consumer* c); uint32_t purge(); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1d49e08eb0..09f5b8ce98 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -346,38 +346,40 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last) void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); - } else { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + { + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; - if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; } - } else { - for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(start, end); - } + + for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } + }//end of lock scope for delivery lock (TODO this is ugly, make it prettier) //if the prefetch limit had previously been reached, or credit //had expired in windowing mode there may be messages that can @@ -525,12 +527,10 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { //need to prevent delivery after requestDispatch returns but - //before credit is reduced to zero; TODO: come up with better - //implementation of flush. - Mutex::ScopedLock l(lock); - queue->requestDispatch(this, true); - byteCredit = 0; - msgCredit = 0; + //before credit is reduced to zero + FlushCompletion completion(*this); + queue->flush(completion); + completion.wait(); } void SemanticState::ConsumerImpl::stop() @@ -599,4 +599,19 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) unacked.erase(range.start, range.end); } + +void SemanticState::FlushCompletion::wait() +{ + Monitor::ScopedLock locker(lock); + while (!complete) lock.wait(); +} + +void SemanticState::FlushCompletion::completed() +{ + Monitor::ScopedLock locker(lock); + consumer.stop(); + complete = true; + lock.notifyAll(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8372f345ad..ff1c8192f7 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -89,6 +89,17 @@ class SemanticState : public framing::FrameHandler::Chains, void acknowledged(const DeliveryRecord&); }; + struct FlushCompletion : DispatchCompletion + { + sys::Monitor lock; + ConsumerImpl& consumer; + bool complete; + + FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {} + void wait(); + void completed(); + }; + typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; SessionState& session; |