summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp23
1 files changed, 16 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9b6bdf5a2b..a598717c5d 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
}
void Channel::complete(Message::shared_ptr msg) {
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- assert(exchange.get());
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
- exchange->route(*deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, *deliverable);
txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, deliverable);
}
}
+void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
+ Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ if (!strategy.delivered) {
+ //TODO:if reject-unroutable, then reject
+ //else route to alternate exchange
+ if (exchange->getAlternate()) {
+ exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ }
+
+}
+
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)