summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-31 18:33:40 +0000
committerGordon Sim <gsim@apache.org>2006-10-31 18:33:40 +0000
commit0487ea40bc6568765cdec75a36273eeb26fae854 (patch)
tree02a18d9d2dfe1852013633320a858970fc784fd4 /cpp/src/qpid
parent1150be6d66a943d899e25af4cb876e7f68c657d9 (diff)
downloadqpid-python-0487ea40bc6568765cdec75a36273eeb26fae854.tar.gz
Hid locking within exchange registry, switched to shared_ptr for exchanges, added some extra error handling and tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469599 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Channel.cpp4
-rw-r--r--cpp/src/qpid/broker/Channel.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h2
-rw-r--r--cpp/src/qpid/broker/Exchange.h28
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp47
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h17
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h2
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp10
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp52
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.h2
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h2
12 files changed, 92 insertions, 80 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index c40811e921..eae0a743db 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -155,7 +155,7 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
-void Channel::handlePublish(Message* _message, Exchange* _exchange){
+void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
Message::shared_ptr message(_message);
exchange = _exchange;
messageBuilder.initialise(message);
@@ -179,7 +179,7 @@ void Channel::complete(Message::shared_ptr& msg){
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
}
- exchange = 0;
+ exchange.reset();
}else{
std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
}
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index ef6700ff80..f5aa0e45ed 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -82,7 +82,7 @@ namespace qpid {
AccumulatedAck accumulatedAck;
TransactionalStore* store;
MessageBuilder messageBuilder;//builder for in-progress message
- Exchange* exchange;//exchange to which any in-progress message was published to
+ Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
@@ -107,7 +107,7 @@ namespace qpid {
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
- void handlePublish(Message* msg, Exchange* exchange);
+ void handlePublish(Message* msg, Exchange::shared_ptr exchange);
void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
};
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index fbbad8109e..5c5f78d90a 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -36,6 +36,8 @@ namespace broker {
static const std::string typeName;
DirectExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index dfa7559683..20b70ed475 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -18,23 +18,27 @@
#ifndef _Exchange_
#define _Exchange_
+#include <boost/shared_ptr.hpp>
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
-namespace broker {
- class Exchange{
- const std::string name;
- public:
- explicit Exchange(const std::string& _name) : name(_name) {}
- virtual ~Exchange(){}
- std::string getName() { return name; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- };
-}
+ namespace broker {
+ class Exchange{
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Exchange> shared_ptr;
+
+ explicit Exchange(const std::string& _name) : name(_name){}
+ virtual ~Exchange(){}
+ std::string getName() { return name; }
+ virtual std::string getType() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ };
+ }
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index f69c258ac1..b2d2afa5f4 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -16,33 +16,46 @@
*
*/
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/TopicExchange.h"
using namespace qpid::broker;
using namespace qpid::concurrent;
+using std::pair;
-ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+ Locker locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ Exchange::shared_ptr exchange;
-ExchangeRegistry::~ExchangeRegistry(){
- for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
- {
- delete i->second;
+ if(type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ }else{
+ throw UnknownExchangeTypeException();
+ }
+ exchanges[name] = exchange;
+ return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
- delete lock;
-}
-
-void ExchangeRegistry::declare(Exchange* exchange){
- exchanges[exchange->getName()] = exchange;
}
void ExchangeRegistry::destroy(const string& name){
- if(exchanges[name]){
- delete exchanges[name];
- exchanges.erase(name);
- }
+ Locker locker(lock);
+ exchanges.erase(name);
}
-Exchange* ExchangeRegistry::get(const string& name){
+Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Locker locker(lock);
return exchanges[name];
}
@@ -51,7 +64,7 @@ namespace
const std::string empty;
}
-Exchange* ExchangeRegistry::getDefault()
+Exchange::shared_ptr ExchangeRegistry::getDefault()
{
return get(empty);
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 335f1ad0ab..fca5462e72 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -20,22 +20,21 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/MonitorImpl.h"
namespace qpid {
namespace broker {
+ struct UnknownExchangeTypeException{};
+
class ExchangeRegistry{
- typedef std::map<string, Exchange*> ExchangeMap;
+ typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::Monitor* lock;
+ qpid::concurrent::MonitorImpl lock;
public:
- ExchangeRegistry();
- void declare(Exchange* exchange);
+ std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
- Exchange* get(const string& name);
- Exchange* getDefault();
- inline qpid::concurrent::Monitor* getLock(){ return lock; }
- ~ExchangeRegistry();
+ Exchange::shared_ptr get(const string& name);
+ Exchange::shared_ptr getDefault();
};
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 209d964bf6..334f1ccdcc 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -37,6 +37,8 @@ class FanOutExchange : public virtual Exchange {
static const std::string typeName;
FanOutExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index f4261916d9..2e2403361e 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -40,6 +40,8 @@ class HeadersExchange : public virtual Exchange {
static const std::string typeName;
HeadersExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 60ee9cc4ad..b98f70ef68 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -33,11 +33,11 @@ const std::string amq_match("amq.match");
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
- exchanges.declare(new DirectExchange(empty)); // Default exchange.
- exchanges.declare(new DirectExchange(amq_direct));
- exchanges.declare(new TopicExchange(amq_topic));
- exchanges.declare(new FanOutExchange(amq_fanout));
- exchanges.declare(new HeadersExchange(amq_match));
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
cleaner.start();
}
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index a472cd27b0..7c94a65d73 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -73,11 +73,8 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
}
-Exchange* SessionHandlerImpl::findExchange(const string& name){
- exchanges->getLock()->acquire();
- Exchange* exchange(exchanges->get(name));
- exchanges->getLock()->release();
- return exchange;
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+ return exchanges->get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -217,40 +214,31 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
FieldTable& /*arguments*/){
- if(!passive && (
- type != TopicExchange::typeName &&
- type != DirectExchange::typeName &&
- type != FanOutExchange::typeName &&
- type != HeadersExchange::typeName
- )
- )
- {
- throw ChannelException(540, "Exchange type not implemented: " + type);
- }
-
- parent->exchanges->getLock()->acquire();
- if(!parent->exchanges->get(exchange)){
- if(type == TopicExchange::typeName){
- parent->exchanges->declare(new TopicExchange(exchange));
- }else if(type == DirectExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if(type == FanOutExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if (type == HeadersExchange::typeName) {
- parent->exchanges->declare(new HeadersExchange(exchange));
+ if(passive){
+ if(!parent->exchanges->get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
}
}
- parent->exchanges->getLock()->release();
+
if(!nowait){
parent->client.getExchange().declareOk(channel);
}
}
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->getLock()->acquire();
parent->exchanges->destroy(exchange);
- parent->exchanges->getLock()->release();
if(!nowait) parent->client.getExchange().deleteOk(channel);
}
@@ -290,7 +278,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange* exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
if(exchange){
if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
exchange->bind(queue, routingKey, &arguments);
@@ -371,7 +359,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
string& exchangeName, string& routingKey,
bool mandatory, bool immediate){
- Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h
index 6b9b5cca6b..62e7ecd4c9 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.h
@@ -97,7 +97,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Exchange* findExchange(const string& name);
+ Exchange::shared_ptr findExchange(const string& name);
public:
SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 9f08153a2e..19ea732fbc 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -77,6 +77,8 @@ class TopicExchange : public virtual Exchange{
static const std::string typeName;
TopicExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);