summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp103
1 files changed, 53 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 9bf148bcf0..376108193a 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const {
void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
- // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(std::string()/* ID */);
+ client.openOk();
}
void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
@@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& args){
+ const string& alternateExchange,
+ bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
if(passive){
- if(!broker.getExchanges().get(exchange)) {
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
+ Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+ checkType(actual, type);
+ checkAlternate(actual, alternate);
}else{
try{
std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
if (response.second) {
- if (durable) broker.getStore().create(*response.first);
- } else if (response.first->getType() != type) {
- throw ConnectionException(
- 530,
- "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
+ if (durable) {
+ broker.getStore().create(*response.first);
+ }
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ } else {
+ checkType(response.first, type);
+ checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
throw ConnectionException(
503, "Exchange type not implemented: " + type);
}
}
- if(!nowait){
- client.declareOk();
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
+{
+ if (!type.empty() && exchange->getType() != type) {
+ throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ }
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
+{
+ if (alternate && alternate != exchange->getAlternate()) {
+ throw ConnectionException(530, "Exchange declared with alternate-exchange "
+ + exchange->getAlternate()->getName() + ", requested "
+ + alternate->getName());
}
+
}
-void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
- const string& name, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk();
}
void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
@@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
}
-void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name,
+void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = getQueue(name);
+ //TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
@@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
assert(queue);
if (queue_created.second) { // This is a new queue
channel.setDefaultQueue(queue);
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
Queue::shared_ptr queue = getQueue(queueName);
@@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk();
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk();
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
@@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk();
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate)
+ bool rejectUnroutable, bool immediate)
{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate);
channel.handlePublish(msg);
}else{
throw ChannelException(
@@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
void BrokerAdapter::TxHandlerImpl::select()
{
channel.startTx();
- client.selectOk();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
channel.commit();
- client.commitOk();
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
channel.rollback();
- client.rollbackOk();
channel.recover(false);
}
@@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//no specific action required, generic response handling should be sufficient
}
-
-//
-// Message class method handlers
-//
-void BrokerAdapter::ChannelHandlerImpl::ping()
-{
- client.ok();
- client.pong();
-}
-
-
-void
-BrokerAdapter::ChannelHandlerImpl::pong()
-{
- client.ok();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
void BrokerAdapter::setResponseTo(RequestId r)
{
basicHandler.client.setResponseTo(r);