summaryrefslogtreecommitdiff
path: root/cpp/broker/src/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/src/Queue.cpp')
-rw-r--r--cpp/broker/src/Queue.cpp155
1 files changed, 0 insertions, 155 deletions
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
deleted file mode 100644
index eaaa3ffa31..0000000000
--- a/cpp/broker/src/Queue.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Queue.h"
-#include "MonitorImpl.h"
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
- name(_name),
- autodelete(_autodelete),
- durable(_durable),
- owner(_owner),
- queueing(false),
- dispatching(false),
- next(0),
- lastUsed(0),
- exclusive(0)
-{
- if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
-}
-
-Queue::~Queue(){
- for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
- b->cancel();
- bindings.pop();
- }
-}
-
-void Queue::bound(Binding* b){
- bindings.push(b);
-}
-
-void Queue::deliver(Message::shared_ptr& msg){
- Locker locker(lock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
- }
-}
-
-bool Queue::dispatch(Message::shared_ptr& msg){
- if(consumers.empty()){
- return false;
- }else if(exclusive){
- if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
- }
- return true;
- }else{
- //deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) return true;
-
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
- }
- return false;
- }
-}
-
-bool Queue::startDispatching(){
- Locker locker(lock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
-
-void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- Locker locker(lock);
- if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
- }
-}
-
-void Queue::consume(Consumer* c, bool requestExclusive){
- Locker locker(lock);
- if(exclusive) throw ExclusiveAccessException();
- if(requestExclusive){
- if(!consumers.empty()) throw ExclusiveAccessException();
- exclusive = c;
- }
-
- if(autodelete && consumers.empty()) lastUsed = 0;
- consumers.push_back(c);
-}
-
-void Queue::cancel(Consumer* c){
- Locker locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
- if(exclusive == c) exclusive = 0;
-}
-
-Message::shared_ptr Queue::dequeue(){
- Locker locker(lock);
- Message::shared_ptr msg;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-u_int32_t Queue::purge(){
- Locker locker(lock);
- int count = messages.size();
- while(!messages.empty()) messages.pop();
- return count;
-}
-
-u_int32_t Queue::getMessageCount() const{
- Locker locker(lock);
- return messages.size();
-}
-
-u_int32_t Queue::getConsumerCount() const{
- Locker locker(lock);
- return consumers.size();
-}
-
-bool Queue::canAutoDelete() const{
- Locker locker(lock);
- return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
-}