diff options
author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/broker/Queue.cpp | |
parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp new file mode 100644 index 0000000000..d672074555 --- /dev/null +++ b/cpp/src/qpid/broker/Queue.cpp @@ -0,0 +1,155 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "./qpid/broker/Queue.h" +#include "./qpid/concurrent/MonitorImpl.h" +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::concurrent; + +Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : + name(_name), + autodelete(_autodelete), + durable(_durable), + owner(_owner), + queueing(false), + dispatching(false), + next(0), + lastUsed(0), + exclusive(0) +{ + if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); +} + +Queue::~Queue(){ + for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ + b->cancel(); + bindings.pop(); + } +} + +void Queue::bound(Binding* b){ + bindings.push(b); +} + +void Queue::deliver(Message::shared_ptr& msg){ + Locker locker(lock); + if(queueing || !dispatch(msg)){ + queueing = true; + messages.push(msg); + } +} + +bool Queue::dispatch(Message::shared_ptr& msg){ + if(consumers.empty()){ + return false; + }else if(exclusive){ + if(!exclusive->deliver(msg)){ + std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + } + return true; + }else{ + //deliver to next consumer + next = next % consumers.size(); + Consumer* c = consumers[next]; + int start = next; + while(c){ + next++; + if(c->deliver(msg)) return true; + + next = next % consumers.size(); + c = next == start ? 0 : consumers[next]; + } + return false; + } +} + +bool Queue::startDispatching(){ + Locker locker(lock); + if(queueing && !dispatching){ + dispatching = true; + return true; + }else{ + return false; + } +} + +void Queue::dispatch(){ + bool proceed = startDispatching(); + while(proceed){ + Locker locker(lock); + if(!messages.empty() && dispatch(messages.front())){ + messages.pop(); + }else{ + dispatching = false; + proceed = false; + queueing = !messages.empty(); + } + } +} + +void Queue::consume(Consumer* c, bool requestExclusive){ + Locker locker(lock); + if(exclusive) throw ExclusiveAccessException(); + if(requestExclusive){ + if(!consumers.empty()) throw ExclusiveAccessException(); + exclusive = c; + } + + if(autodelete && consumers.empty()) lastUsed = 0; + consumers.push_back(c); +} + +void Queue::cancel(Consumer* c){ + Locker locker(lock); + consumers.erase(find(consumers.begin(), consumers.end(), c)); + if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); + if(exclusive == c) exclusive = 0; +} + +Message::shared_ptr Queue::dequeue(){ + Locker locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + return msg; +} + +u_int32_t Queue::purge(){ + Locker locker(lock); + int count = messages.size(); + while(!messages.empty()) messages.pop(); + return count; +} + +u_int32_t Queue::getMessageCount() const{ + Locker locker(lock); + return messages.size(); +} + +u_int32_t Queue::getConsumerCount() const{ + Locker locker(lock); + return consumers.size(); +} + +bool Queue::canAutoDelete() const{ + Locker locker(lock); + return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); +} |