diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 9b6bdf5a2b..a598717c5d 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { } void Channel::complete(Message::shared_ptr msg) { - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - assert(exchange.get()); if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); - exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, *deliverable); txBuffer->enlist(op); } else { DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, deliverable); } } +void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { + Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + if (!strategy.delivered) { + //TODO:if reject-unroutable, then reject + //else route to alternate exchange + if (exchange->getAlternate()) { + exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + } + } + +} + // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) |