summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp32
1 files changed, 16 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index a4ceb77c12..7529e3bb39 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -18,7 +18,7 @@
#include "qpid/QpidError.h"
#include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
+#include "Session.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
@@ -45,7 +45,7 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
void
MessageHandlerImpl::cancel(const string& destination )
{
- channel.cancel(destination);
+ session.cancel(destination);
}
void
@@ -96,12 +96,12 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = getQueue(queueName);
- if(!destination.empty() && channel.exists(destination))
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if(!destination.empty() && session.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -114,9 +114,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
- if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -145,14 +145,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
bool /*global*/ )
{
//TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
+ session.setPrefetchSize(prefetchSize);
+ session.setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- channel.recover(requeue);
+ session.recover(requeue);
}
void
@@ -166,10 +166,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
if (unit == 0) {
//message
- channel.addMessageCredit(destination, value);
+ session.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- channel.addByteCredit(destination, value);
+ session.addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -181,10 +181,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
{
if (mode == 0) {
//credit
- channel.setCreditMode(destination);
+ session.setCreditMode(destination);
} else if (mode == 1) {
//window
- channel.setWindowMode(destination);
+ session.setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -192,12 +192,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
void MessageHandlerImpl::flush(const std::string& destination)
{
- channel.flush(destination);
+ session.flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- channel.stop(destination);
+ session.stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)