summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-17 12:17:55 +0000
committerGordon Sim <gsim@apache.org>2008-03-17 12:17:55 +0000
commit6574ab48665039ae9b8b1d2c5dd26ea94a3d23fa (patch)
tree01091458f1db56d09953cd129305586057df1384 /cpp
parent1c1efeddef24ef18d75af65e4249b541b1382ea8 (diff)
downloadqpid-python-6574ab48665039ae9b8b1d2c5dd26ea94a3d23fa.tar.gz
Scope exclusive queues to sessions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@637854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/ConnectionToken.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/Queue.h10
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h6
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp27
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h17
7 files changed, 49 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h
index 7e7f813d0e..38b7d7d098 100644
--- a/cpp/src/qpid/broker/ConnectionToken.h
+++ b/cpp/src/qpid/broker/ConnectionToken.h
@@ -21,13 +21,14 @@
#ifndef _ConnectionToken_
#define _ConnectionToken_
+#include "OwnershipToken.h"
namespace qpid {
namespace broker {
/**
* An empty interface allowing opaque implementations of some
* form of token to identify a connection.
*/
- class ConnectionToken{
+ class ConnectionToken : public OwnershipToken {
public:
virtual ~ConnectionToken(){}
};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index abe4f3f9a5..c4094a117b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -46,7 +46,7 @@ using std::mem_fun;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
- const ConnectionToken* const _owner,
+ const OwnershipToken* const _owner,
Manageable* parent) :
name(_name),
@@ -582,7 +582,7 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
}
-bool Queue::isExclusiveOwner(const ConnectionToken* const o) const
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
return o == owner;
@@ -594,7 +594,7 @@ void Queue::releaseExclusiveOwnership()
owner = 0;
}
-bool Queue::setExclusiveOwner(const ConnectionToken* const o)
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index aaae175be8..bd6f1fe2c7 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -28,7 +28,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include "qpid/framing/amqp_types.h"
-#include "ConnectionToken.h"
+#include "OwnershipToken.h"
#include "Consumer.h"
#include "Message.h"
#include "qpid/framing/FieldTable.h"
@@ -63,7 +63,7 @@ namespace qpid {
const string name;
const bool autodelete;
MessageStore* store;
- const ConnectionToken* owner;
+ const OwnershipToken* owner;
uint32_t consumerCount;
bool exclusive;
Listeners listeners;
@@ -100,7 +100,7 @@ namespace qpid {
Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
- const ConnectionToken* const owner = 0,
+ const OwnershipToken* const owner = 0,
Manageable* parent = 0);
~Queue();
@@ -143,9 +143,9 @@ namespace qpid {
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
- bool isExclusiveOwner(const ConnectionToken* const o) const;
+ bool isExclusiveOwner(const OwnershipToken* const o) const;
void releaseExclusiveOwnership();
- bool setExclusiveOwner(const ConnectionToken* const o);
+ bool setExclusiveOwner(const OwnershipToken* const o);
bool hasExclusiveConsumer() const;
bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 14d5f362e3..61bdb0ffde 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -33,7 +33,7 @@ QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const ConnectionToken* owner)
+ bool autoDelete, const OwnershipToken* owner)
{
RWlock::ScopedWlock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index ccccef31b5..60da0619f4 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -48,7 +48,7 @@ class QueueRegistry{
* was created by this declare call false if it already existed.
*/
std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false,
- const ConnectionToken* const owner = 0);
+ const OwnershipToken* const owner = 0);
/**
* Destroy the named queue.
@@ -62,7 +62,6 @@ class QueueRegistry{
* subsequent calls to find or declare with the same name.
*
*/
- void destroyLH (const string& name);
void destroy (const string& name);
template <class Test> bool destroyIf(const string& name, Test test)
{
@@ -107,6 +106,9 @@ private:
int counter;
MessageStore* store;
management::Manageable* parent;
+
+ //destroy impl that assumes lock is already held:
+ void destroyLH (const string& name);
};
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 64bb7cdae3..5d33e68fab 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -19,6 +19,7 @@
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
+#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
@@ -180,6 +181,22 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
}
}
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker())
+{}
+
+
+SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
+{
+ while (!exclusiveQueues.empty()) {
+ Queue::shared_ptr q(exclusiveQueues.front());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete()) {
+ Queue::tryAutoDelete(broker, q);
+ }
+ exclusiveQueues.erase(exclusiveQueues.begin());
+ }
+}
+
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = getQueue(name);
@@ -212,7 +229,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
getBroker().getQueues().declare(
name, durable,
autoDelete,
- exclusive ? &getConnection() : 0);
+ exclusive ? this : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -230,15 +247,15 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
//handle automatic cleanup:
if (exclusive) {
- getConnection().exclusiveQueues.push_back(queue);
+ exclusiveQueues.push_back(queue);
}
} else {
- if (exclusive && queue->setExclusiveOwner(&getConnection())) {
- getConnection().exclusiveQueues.push_back(queue);
+ if (exclusive && queue->setExclusiveOwner(this)) {
+ exclusiveQueues.push_back(queue);
}
}
}
- if (exclusive && !queue->isExclusiveOwner(&getConnection()))
+ if (exclusive && !queue->isExclusiveOwner(this))
throw ResourceLockedException(
QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index e4c3a8676f..dad89cd123 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -25,8 +25,11 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/SequenceSet.h"
+#include "OwnershipToken.h"
+#include <vector>
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -34,6 +37,7 @@ namespace broker {
class Channel;
class Connection;
class Broker;
+class Queue;
/**
* Per-channel protocol adapter.
@@ -44,7 +48,7 @@ class Broker;
* peer.
*
*/
-class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
+ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
{
public:
SessionAdapter(SemanticState& session);
@@ -116,12 +120,15 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
shared_ptr<Exchange> alternate);
};
- class QueueHandlerImpl :
- public Queue010Handler,
- public HandlerHelper
+ class QueueHandlerImpl : public Queue010Handler,
+ public HandlerHelper, public OwnershipToken
{
+ Broker& broker;
+ std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+
public:
- QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+ QueueHandlerImpl(SemanticState& session);
+ ~QueueHandlerImpl();
void declare(const std::string& queue,
const std::string& alternateExchange,