diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 103 |
1 files changed, 53 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 9bf148bcf0..376108193a 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const { void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); - // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk(std::string()/* ID */); + client.openOk(); } void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ @@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){} void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& args){ + const string& alternateExchange, + bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } if(passive){ - if(!broker.getExchanges().get(exchange)) { - throw ChannelException(404, "Exchange not found: " + exchange); - } + Exchange::shared_ptr actual(broker.getExchanges().get(exchange)); + checkType(actual, type); + checkAlternate(actual, alternate); }else{ try{ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); if (response.second) { - if (durable) broker.getStore().create(*response.first); - } else if (response.first->getType() != type) { - throw ConnectionException( - 530, - "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); + if (durable) { + broker.getStore().create(*response.first); + } + if (alternate) { + response.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + } else { + checkType(response.first, type); + checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ throw ConnectionException( 503, "Exchange type not implemented: " + type); } } - if(!nowait){ - client.declareOk(); +} + +void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) +{ + if (!type.empty() && exchange->getType() != type) { + throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + } +} + +void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) +{ + if (alternate && alternate != exchange->getAlternate()) { + throw ConnectionException(530, "Exchange declared with alternate-exchange " + + exchange->getAlternate()->getName() + ", requested " + + alternate->getName()); } + } -void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, - const string& name, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); if (exchange->isDurable()) broker.getStore().destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk(); } void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) @@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } } -void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, +void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } Queue::shared_ptr queue; if (passive && !name.empty()) { queue = getQueue(name); + //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( @@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& assert(queue); if (queue_created.second) { // This is a new queue channel.setDefaultQueue(queue); + if (alternate) { + queue->setAlternateExchange(alternate); + alternate->incAlternateUsers(); + } + //apply settings & create persistent record if required queue_created.first->create(arguments); @@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, + const string& exchangeName, const string& routingKey, const FieldTable& arguments){ Queue::shared_ptr queue = getQueue(queueName); @@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk(); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk(); } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ @@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk(); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate) + bool rejectUnroutable, bool immediate) { Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate); channel.handlePublish(msg); }else{ throw ChannelException( @@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) void BrokerAdapter::TxHandlerImpl::select() { channel.startTx(); - client.selectOk(); } void BrokerAdapter::TxHandlerImpl::commit() { channel.commit(); - client.commitOk(); } void BrokerAdapter::TxHandlerImpl::rollback() { channel.rollback(); - client.rollbackOk(); channel.recover(false); } @@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok() //no specific action required, generic response handling should be sufficient } - -// -// Message class method handlers -// -void BrokerAdapter::ChannelHandlerImpl::ping() -{ - client.ok(); - client.pong(); -} - - -void -BrokerAdapter::ChannelHandlerImpl::pong() -{ - client.ok(); -} - -void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - void BrokerAdapter::setResponseTo(RequestId r) { basicHandler.client.setResponseTo(r); |