summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp52
1 files changed, 20 insertions, 32 deletions
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index a472cd27b0..7c94a65d73 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -73,11 +73,8 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
}
-Exchange* SessionHandlerImpl::findExchange(const string& name){
- exchanges->getLock()->acquire();
- Exchange* exchange(exchanges->get(name));
- exchanges->getLock()->release();
- return exchange;
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+ return exchanges->get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -217,40 +214,31 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
FieldTable& /*arguments*/){
- if(!passive && (
- type != TopicExchange::typeName &&
- type != DirectExchange::typeName &&
- type != FanOutExchange::typeName &&
- type != HeadersExchange::typeName
- )
- )
- {
- throw ChannelException(540, "Exchange type not implemented: " + type);
- }
-
- parent->exchanges->getLock()->acquire();
- if(!parent->exchanges->get(exchange)){
- if(type == TopicExchange::typeName){
- parent->exchanges->declare(new TopicExchange(exchange));
- }else if(type == DirectExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if(type == FanOutExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if (type == HeadersExchange::typeName) {
- parent->exchanges->declare(new HeadersExchange(exchange));
+ if(passive){
+ if(!parent->exchanges->get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
}
}
- parent->exchanges->getLock()->release();
+
if(!nowait){
parent->client.getExchange().declareOk(channel);
}
}
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->getLock()->acquire();
parent->exchanges->destroy(exchange);
- parent->exchanges->getLock()->release();
if(!nowait) parent->client.getExchange().deleteOk(channel);
}
@@ -290,7 +278,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange* exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
if(exchange){
if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
exchange->bind(queue, routingKey, &arguments);
@@ -371,7 +359,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
string& exchangeName, string& routingKey,
bool mandatory, bool immediate){
- Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);