diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 25 |
1 files changed, 6 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index ce1fa1e028..a4ceb77c12 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" -#include "BrokerMessageMessage.h" +#include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" @@ -55,7 +55,7 @@ MessageHandlerImpl::open(const string& /*reference*/) } void -MessageHandlerImpl::append(const framing::AMQMethodBody& ) +MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { throw ConnectionException(540, "References no longer supported"); } @@ -92,7 +92,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t /*acquireMode*/,//TODO: implement acquire modes + u_int8_t acquireMode,//TODO: implement acquire modes bool exclusive, const framing::FieldTable& filter ) { @@ -101,7 +101,8 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -115,7 +116,7 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -160,20 +161,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*co //TODO: implement } -void -MessageHandlerImpl::transfer(const framing::AMQMethodBody& context) -{ - const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context); - if (transfer->getBody().isInline()) { - MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer)); - channel.handleInlineTransfer(message); - } else { - throw ConnectionException(540, "References no longer supported"); - } -} - - - void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) { |