diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDelivery.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 71 |
1 files changed, 4 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 36862edf37..f2a55e2790 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -24,10 +24,7 @@ #include "Message.h" #include "Queue.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/BasicXDeliverBody.h" -#include "qpid/framing/BasicXGetOkBody.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageXTransferBody.h" using namespace boost; @@ -43,41 +40,6 @@ struct BaseToken : DeliveryToken virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0; }; -struct BasicGetToken : BaseToken -{ - typedef boost::shared_ptr<BasicGetToken> shared_ptr; - - Queue::shared_ptr queue; - - BasicGetToken(Queue::shared_ptr q) : queue(q) {} - - AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) - { - return AMQFrame(in_place<BasicXGetOkBody>( - ProtocolVersion(), id.getValue(), - msg->getRedelivered(), msg->getExchangeName(), - msg->getRoutingKey(), queue->getMessageCount())); - } -}; - -struct BasicConsumeToken : BaseToken -{ - typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; - - const string consumer; - - BasicConsumeToken(const string c) : consumer(c) {} - - AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) - { - return AMQFrame(in_place<BasicXDeliverBody>( - ProtocolVersion(), consumer, id.getValue(), - msg->getRedelivered(), msg->getExchangeName(), - msg->getRoutingKey())); - } - -}; - struct MessageDeliveryToken : BaseToken { const std::string destination; @@ -91,48 +53,23 @@ struct MessageDeliveryToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/) { //may need to set the redelivered flag: - if (isPreview) { - if (msg->getRedelivered()){ - msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true); - } - return AMQFrame(in_place<MessageXTransferBody>( - ProtocolVersion(), 0, destination, - confirmMode, acquireMode)); - } else { - if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties>()->setRedelivered(true); - } - return AMQFrame(in_place<MessageTransferBody>( - ProtocolVersion(), destination, confirmMode, acquireMode)); + if (msg->getRedelivered()){ + msg->getProperties<DeliveryProperties>()->setRedelivered(true); } + return AMQFrame(in_place<MessageTransferBody>( + ProtocolVersion(), destination, confirmMode, acquireMode)); } }; } } -DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue) -{ - return DeliveryToken::shared_ptr(new BasicGetToken(queue)); -} - -DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer) -{ - return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); -} - DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, u_int8_t confirmMode, u_int8_t acquireMode) { return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false)); } -DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination, - u_int8_t confirmMode, u_int8_t acquireMode) -{ - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true)); -} - void MessageDelivery::deliver(QueuedMessage& msg, framing::FrameHandler& handler, DeliveryId id, |