summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageDelivery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageDelivery.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index b259aa6b8f..6471245ed9 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -39,7 +39,7 @@ namespace broker{
struct BaseToken : DeliveryToken
{
virtual ~BaseToken() {}
- virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+ virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0;
};
struct BasicGetToken : BaseToken
@@ -50,12 +50,11 @@ struct BasicGetToken : BaseToken
BasicGetToken(Queue::shared_ptr q) : queue(q) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicGetOkBody(
- channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+ return AMQFrame(0, BasicGetOkBody(
+ ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey(), queue->getMessageCount()));
-
}
};
@@ -67,10 +66,10 @@ struct BasicConsumeToken : BaseToken
BasicConsumeToken(const string c) : consumer(c) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumer, id.getValue(),
+ return AMQFrame(0, BasicDeliverBody(
+ ProtocolVersion(), consumer, id.getValue(),
msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
}
@@ -85,16 +84,13 @@ struct MessageDeliveryToken : BaseToken
MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
destination(d), confirmMode(c), acquireMode(a) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/)
{
- //TODO; need to figure out how the acquire mode gets
- //communicated (this is just a temporary solution)
- channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
-
//may need to set the redelivered flag:
if (msg->getRedelivered()){
msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode));
}
};
@@ -127,11 +123,15 @@ void MessageDelivery::deliver(Message::shared_ptr msg,
//another may well have the wrong headers; however we will only
//have one content class for 0-10 proper
+ FrameHandler& handler = channel.getHandlers().out;
+
//send method
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
- t->sendMethod(msg, channel, id);
+ AMQFrame method = t->sendMethod(msg, id);
+ method.setEof(false);
+ method.setChannel(channel.getId());
+ handler.handle(method);
- FrameHandler& handler = channel.getHandlers().out;
msg->sendHeader(handler, channel.getId(), framesize);
msg->sendContent(handler, channel.getId(), framesize);
}