diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDelivery.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index b259aa6b8f..6471245ed9 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -39,7 +39,7 @@ namespace broker{ struct BaseToken : DeliveryToken { virtual ~BaseToken() {} - virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0; + virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0; }; struct BasicGetToken : BaseToken @@ -50,12 +50,11 @@ struct BasicGetToken : BaseToken BasicGetToken(Queue::shared_ptr q) : queue(q) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicGetOkBody( - channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), + return AMQFrame(0, BasicGetOkBody( + ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey(), queue->getMessageCount())); - } }; @@ -67,10 +66,10 @@ struct BasicConsumeToken : BaseToken BasicConsumeToken(const string c) : consumer(c) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicDeliverBody( - channel.getVersion(), consumer, id.getValue(), + return AMQFrame(0, BasicDeliverBody( + ProtocolVersion(), consumer, id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); } @@ -85,16 +84,13 @@ struct MessageDeliveryToken : BaseToken MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : destination(d), confirmMode(c), acquireMode(a) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/) { - //TODO; need to figure out how the acquire mode gets - //communicated (this is just a temporary solution) - channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode)); - //may need to set the redelivered flag: if (msg->getRedelivered()){ msg->getProperties<DeliveryProperties>()->setRedelivered(true); } + return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode)); } }; @@ -127,11 +123,15 @@ void MessageDelivery::deliver(Message::shared_ptr msg, //another may well have the wrong headers; however we will only //have one content class for 0-10 proper + FrameHandler& handler = channel.getHandlers().out; + //send method boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); - t->sendMethod(msg, channel, id); + AMQFrame method = t->sendMethod(msg, id); + method.setEof(false); + method.setChannel(channel.getId()); + handler.handle(method); - FrameHandler& handler = channel.getHandlers().out; msg->sendHeader(handler, channel.getId(), framesize); msg->sendContent(handler, channel.getId(), framesize); } |