summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp33
-rw-r--r--cpp/src/qpid/broker/Queue.h24
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp87
-rw-r--r--cpp/src/qpid/broker/SemanticState.h11
4 files changed, 102 insertions, 53 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 16e91fc1cf..9586f6b994 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -124,21 +124,20 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer* c, bool sync){
+void Queue::requestDispatch(Consumer* c){
if (!c || c->preAcquires()) {
- if (sync) {
- Mutex::ScopedLock locker(messageLock);
- dispatch();
- } else {
- serializer.execute(dispatchCallback);
- }
+ serializer.execute(dispatchCallback);
} else {
- //note: this is always done on the callers thread, regardless
- // of sync; browsers of large queues should use flow control!
serviceBrowser(c);
}
}
+void Queue::flush(DispatchCompletion& completion)
+{
+ DispatchFunctor f(*this, &completion);
+ serializer.execute(f);
+}
+
Consumer* Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
@@ -179,9 +178,18 @@ void Queue::dispatch(){
} else {
break;
}
- }
- RWlock::ScopedRlock locker(consumerLock);
- for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {
+ }
+ serviceAllBrowsers();
+}
+
+void Queue::serviceAllBrowsers()
+{
+ Consumers copy;
+ {
+ RWlock::ScopedRlock locker(consumerLock);
+ copy = browsers;
+ }
+ for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
serviceBrowser(*i);
}
}
@@ -428,3 +436,4 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
{
return alternateExchange;
}
+
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index e02444642b..6e859e67bb 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -48,20 +48,32 @@ namespace qpid {
using std::string;
+ struct DispatchCompletion
+ {
+ virtual ~DispatchCompletion() {}
+ virtual void completed() = 0;
+ };
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue{
+ class Queue : public PersistableQueue {
typedef std::vector<Consumer*> Consumers;
typedef std::deque<QueuedMessage> Messages;
- struct DispatchFunctor {
+ struct DispatchFunctor
+ {
Queue& queue;
- DispatchFunctor(Queue& q) : queue(q) {}
- void operator()() { queue.dispatch(); }
+ DispatchCompletion* sync;
+ DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
+ void operator()()
+ {
+ queue.dispatch();
+ if (sync) sync->completed();
+ }
};
const string name;
@@ -93,6 +105,7 @@ namespace qpid {
*/
void dispatch();
void cancel(Consumer* c, Consumers& set);
+ void serviceAllBrowsers();
void serviceBrowser(Consumer* c);
Consumer* allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
@@ -149,7 +162,8 @@ namespace qpid {
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch(Consumer* c = 0, bool sync = false);
+ void requestDispatch(Consumer* c = 0);
+ void flush(DispatchCompletion& callback);
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1d49e08eb0..09f5b8ce98 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -346,38 +346,40 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last)
void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+ {
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
}
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(start, end);
- }
+
+ for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
+ }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
//if the prefetch limit had previously been reached, or credit
//had expired in windowing mode there may be messages that can
@@ -525,12 +527,10 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
void SemanticState::ConsumerImpl::flush()
{
//need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero; TODO: come up with better
- //implementation of flush.
- Mutex::ScopedLock l(lock);
- queue->requestDispatch(this, true);
- byteCredit = 0;
- msgCredit = 0;
+ //before credit is reduced to zero
+ FlushCompletion completion(*this);
+ queue->flush(completion);
+ completion.wait();
}
void SemanticState::ConsumerImpl::stop()
@@ -599,4 +599,19 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
unacked.erase(range.start, range.end);
}
+
+void SemanticState::FlushCompletion::wait()
+{
+ Monitor::ScopedLock locker(lock);
+ while (!complete) lock.wait();
+}
+
+void SemanticState::FlushCompletion::completed()
+{
+ Monitor::ScopedLock locker(lock);
+ consumer.stop();
+ complete = true;
+ lock.notifyAll();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 8372f345ad..ff1c8192f7 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -89,6 +89,17 @@ class SemanticState : public framing::FrameHandler::Chains,
void acknowledged(const DeliveryRecord&);
};
+ struct FlushCompletion : DispatchCompletion
+ {
+ sys::Monitor lock;
+ ConsumerImpl& consumer;
+ bool complete;
+
+ FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {}
+ void wait();
+ void completed();
+ };
+
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
SessionState& session;