diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index a4ceb77c12..7529e3bb39 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -18,7 +18,7 @@ #include "qpid/QpidError.h" #include "MessageHandlerImpl.h" -#include "BrokerChannel.h" +#include "Session.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" @@ -45,7 +45,7 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) void MessageHandlerImpl::cancel(const string& destination ) { - channel.cancel(destination); + session.cancel(destination); } void @@ -96,12 +96,12 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = getQueue(queueName); - if(!destination.empty() && channel.exists(destination)) + Queue::shared_ptr queue = session.getQueue(queueName); + if(!destination.empty() && session.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -114,9 +114,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = getQueue(queueName); + Queue::shared_ptr queue = session.getQueue(queueName); - if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -145,14 +145,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - channel.setPrefetchSize(prefetchSize); - channel.setPrefetchCount(prefetchCount); + session.setPrefetchSize(prefetchSize); + session.setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - channel.recover(requeue); + session.recover(requeue); } void @@ -166,10 +166,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - channel.addMessageCredit(destination, value); + session.addMessageCredit(destination, value); } else if (unit == 1) { //bytes - channel.addByteCredit(destination, value); + session.addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -181,10 +181,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - channel.setCreditMode(destination); + session.setCreditMode(destination); } else if (mode == 1) { //window - channel.setWindowMode(destination); + session.setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -192,12 +192,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - channel.flush(destination); + session.flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - channel.stop(destination); + session.stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/) |