diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index da57439e21..c728a800ab 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -27,6 +27,8 @@ #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" +#include <boost/format.hpp> + namespace qpid { namespace broker { @@ -96,7 +98,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); + channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -130,7 +132,7 @@ MessageHandlerImpl::empty() void MessageHandlerImpl::ok() { - channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); + throw ConnectionException(540, "Message.Ok no longer supported"); } void @@ -171,4 +173,45 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) } + +void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) +{ + + if (unit == 0) { + //message + channel.addMessageCredit(destination, value); + } else if (unit == 1) { + //bytes + channel.addByteCredit(destination, value); + } else { + //unknown + throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + } + +} + +void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) +{ + if (mode == 0) { + //credit + channel.setCreditMode(destination); + } else if (mode == 1) { + //window + channel.setWindowMode(destination); + } else{ + throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + } +} + +void MessageHandlerImpl::flush(const std::string& destination) +{ + channel.flush(destination); +} + +void MessageHandlerImpl::stop(const std::string& destination) +{ + channel.stop(destination); +} + + }} // namespace qpid::broker |