summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp102
1 files changed, 83 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 553f6016d2..a094c7a804 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -112,31 +112,50 @@ void Queue::requeue(const QueuedMessage& msg){
}
-
-void Queue::requestDispatch(){
- serializer.execute(dispatchCallback);
+bool Queue::acquire(const QueuedMessage& msg) {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position == msg.position) {
+ messages.erase(i);
+ return true;
+ }
+ }
+ return false;
}
+void Queue::requestDispatch(Consumer* c, bool sync){
+ if (!c || c->preAcquires()) {
+ if (sync) {
+ serializer.dispatch();
+ } else {
+ serializer.execute(dispatchCallback);
+ }
+ } else {
+ //note: this is always done on the callers thread, regardless
+ // of sync; browsers of large queues should use flow control!
+ serviceBrowser(c);
+ }
+}
bool Queue::dispatch(QueuedMessage& msg){
RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
- if(consumers.empty()){
+ if(acquirers.empty()){
return false;
}else if(exclusive){
return exclusive->deliver(msg);
}else{
//deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
+ next = next % acquirers.size();
+ Consumer* c = acquirers[next];
int start = next;
while(c){
next++;
if(c->deliver(msg)) return true;
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
+ next = next % acquirers.size();
+ c = next == start ? 0 : acquirers[next];
}
return false;
}
@@ -153,34 +172,79 @@ void Queue::dispatch(){
}
if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
pop();
- } else {
+ } else {
break;
}
- }
+ }
+ RWlock::ScopedRlock locker(consumerLock);
+ for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {
+ serviceBrowser(*i);
+ }
+}
+
+void Queue::serviceBrowser(Consumer* browser)
+{
+ //This is a poorly performing implementation:
+ //
+ // * bad concurrency where browsers exist
+ // * inefficient for largish queues
+ //
+ //The queue needs to be based on a current data structure that
+ //does not invalidate iterators when modified. Subscribers could
+ //then use an iterator to continue from where they left off
+
+ Mutex::ScopedLock locker(messageLock);
+ if (!messages.empty() && messages.back().position > browser->position) {
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position > browser->position) {
+ if (browser->deliver(*i)) {
+ browser->position = i->position;
+ } else {
+ break;
+ }
+ }
+ }
+ }
}
void Queue::consume(Consumer* c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
- if(exclusive)
+ if(exclusive) {
throw ChannelException(
403, format("Queue '%s' has an exclusive consumer."
" No more consumers allowed.") % getName());
+ }
if(requestExclusive) {
- if(!consumers.empty())
+ if(acquirers.empty() && browsers.empty()) {
+ exclusive = c;
+ } else {
throw ChannelException(
- 403, format("Queue '%s' already has conumers."
- "Exclusive access denied.") %getName());
- exclusive = c;
+ 403, format("Queue '%s' already has consumers."
+ "Exclusive access denied.") % getName());
+ }
+ }
+ if (c->preAcquires()) {
+ acquirers.push_back(c);
+ } else {
+ browsers.push_back(c);
}
- consumers.push_back(c);
}
void Queue::cancel(Consumer* c){
RWlock::ScopedWlock locker(consumerLock);
+ if (c->preAcquires()) {
+ cancel(c, acquirers);
+ } else {
+ cancel(c, browsers);
+ }
+ if(exclusive == c) exclusive = 0;
+}
+
+void Queue::cancel(Consumer* c, Consumers& consumers)
+{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
consumers.erase(i);
- if(exclusive == c) exclusive = 0;
}
QueuedMessage Queue::dequeue(){
@@ -233,12 +297,12 @@ uint32_t Queue::getMessageCount() const{
uint32_t Queue::getConsumerCount() const{
RWlock::ScopedRlock locker(consumerLock);
- return consumers.size();
+ return acquirers.size() + browsers.size();
}
bool Queue::canAutoDelete() const{
RWlock::ScopedRlock locker(consumerLock);
- return autodelete && consumers.size() == 0;
+ return autodelete && acquirers.empty() && browsers.empty();
}
// return true if store exists,