summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp202
1 files changed, 0 insertions, 202 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
deleted file mode 100644
index 00b0a844ab..0000000000
--- a/cpp/src/qpid/broker/Queue.cpp
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/MessageStore.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/sys/Time.h>
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-Queue::Queue(const string& _name, u_int32_t _autodelete,
- MessageStore* const _store,
- const ConnectionToken* const _owner) :
-
- name(_name),
- autodelete(_autodelete),
- store(_store),
- owner(_owner),
- queueing(false),
- dispatching(false),
- next(0),
- lastUsed(0),
- exclusive(0),
- persistenceId(0)
-{
- if(autodelete) lastUsed = now()/TIME_MSEC;
-}
-
-Queue::~Queue(){
- for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
- b->cancel();
- bindings.pop();
- }
-}
-
-void Queue::bound(Binding* b){
- bindings.push(b);
-}
-
-void Queue::deliver(Message::shared_ptr& msg){
- enqueue(0, msg, 0);
- process(msg);
-}
-
-void Queue::recover(Message::shared_ptr& msg){
- queueing = true;
- messages.push(msg);
-}
-
-void Queue::process(Message::shared_ptr& msg){
- Mutex::ScopedLock locker(lock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
- }
-}
-
-bool Queue::dispatch(Message::shared_ptr& msg){
- if(consumers.empty()){
- return false;
- }else if(exclusive){
- if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
- }
- return true;
- }else{
- //deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) return true;
-
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
- }
- return false;
- }
-}
-
-bool Queue::startDispatching(){
- Mutex::ScopedLock locker(lock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
-
-void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- Mutex::ScopedLock locker(lock);
- if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
- }
-}
-
-void Queue::consume(Consumer* c, bool requestExclusive){
- Mutex::ScopedLock locker(lock);
- if(exclusive) throw ExclusiveAccessException();
- if(requestExclusive){
- if(!consumers.empty()) throw ExclusiveAccessException();
- exclusive = c;
- }
-
- if(autodelete && consumers.empty()) lastUsed = 0;
- consumers.push_back(c);
-}
-
-void Queue::cancel(Consumer* c){
- Mutex::ScopedLock locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
- if(exclusive == c) exclusive = 0;
-}
-
-Message::shared_ptr Queue::dequeue(){
- Mutex::ScopedLock locker(lock);
- Message::shared_ptr msg;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-u_int32_t Queue::purge(){
- Mutex::ScopedLock locker(lock);
- int count = messages.size();
- while(!messages.empty()) messages.pop();
- return count;
-}
-
-u_int32_t Queue::getMessageCount() const{
- Mutex::ScopedLock locker(lock);
- return messages.size();
-}
-
-u_int32_t Queue::getConsumerCount() const{
- Mutex::ScopedLock locker(lock);
- return consumers.size();
-}
-
-bool Queue::canAutoDelete() const{
- Mutex::ScopedLock locker(lock);
- return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
-}
-
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
-{
- if (msg->isPersistent() && store) {
- store->enqueue(ctxt, msg, *this, xid);
- }
-}
-
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
-{
- if (msg->isPersistent() && store) {
- store->dequeue(ctxt, msg, *this, xid);
- }
-}
-
-void Queue::create()
-{
- if (store) {
- store->create(*this);
- }
-}
-
-void Queue::destroy()
-{
- if (store) {
- store->destroy(*this);
- }
-}