summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageDelivery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageDelivery.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp71
1 files changed, 4 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 36862edf37..f2a55e2790 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -24,10 +24,7 @@
#include "Message.h"
#include "Queue.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/BasicXDeliverBody.h"
-#include "qpid/framing/BasicXGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
using namespace boost;
@@ -43,41 +40,6 @@ struct BaseToken : DeliveryToken
virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0;
};
-struct BasicGetToken : BaseToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXGetOkBody>(
- ProtocolVersion(), id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey(), queue->getMessageCount()));
- }
-};
-
-struct BasicConsumeToken : BaseToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXDeliverBody>(
- ProtocolVersion(), consumer, id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey()));
- }
-
-};
-
struct MessageDeliveryToken : BaseToken
{
const std::string destination;
@@ -91,48 +53,23 @@ struct MessageDeliveryToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
//may need to set the redelivered flag:
- if (isPreview) {
- if (msg->getRedelivered()){
- msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageXTransferBody>(
- ProtocolVersion(), 0, destination,
- confirmMode, acquireMode));
- } else {
- if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageTransferBody>(
- ProtocolVersion(), destination, confirmMode, acquireMode));
+ if (msg->getRedelivered()){
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(in_place<MessageTransferBody>(
+ ProtocolVersion(), destination, confirmMode, acquireMode));
}
};
}
}
-DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode, u_int8_t acquireMode)
{
return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
}
-DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination,
- u_int8_t confirmMode, u_int8_t acquireMode)
-{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
-}
-
void MessageDelivery::deliver(QueuedMessage& msg,
framing::FrameHandler& handler,
DeliveryId id,