summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp11
1 files changed, 7 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index bbfcf209ad..f586ea92fc 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,6 +21,8 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -127,7 +129,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(
+ channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
client.ok(context.getRequestId());
@@ -144,7 +146,8 @@ MessageHandlerImpl::get( const MethodContext& context,
{
Queue::shared_ptr queue = getQueue(queueName);
- if(channel.get(queue, destination, !noAck))
+ GetAdapter out(adapter, queue, destination, connection.getFrameMax());
+ if(channel.get(out, queue, !noAck))
client.ok(context.getRequestId());
else
client.empty(context.getRequestId());
@@ -162,7 +165,7 @@ MessageHandlerImpl::empty( const MethodContext& )
void
MessageHandlerImpl::ok(const MethodContext& /*context*/)
{
- channel.ack();
+ channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
}
void
@@ -190,7 +193,7 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/,
uint16_t /*code*/,
const string& /*text*/ )
{
- channel.ack();
+ //channel.ack();
// channel.requeue();
}