summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
committerAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
commit2f6d6ad7efd788b71204af67dff51b6233881e2e (patch)
treea3d123bc112d12dfcef341a312f418624c98e342 /cpp/src/qpid/broker/MessageHandlerImpl.cpp
parent3b80f903b6174b4346d7d7b537d783f628fe28d6 (diff)
downloadqpid-python-2f6d6ad7efd788b71204af67dff51b6233881e2e.tar.gz
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains. broker::SemanticState: session state for the SemanticHandler. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp40
1 files changed, 19 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index a31ac78aa4..3d197e185d 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -18,7 +18,6 @@
#include "qpid/QpidError.h"
#include "MessageHandlerImpl.h"
-#include "Session.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
@@ -36,8 +35,7 @@ namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Session& session)
- : HandlerImpl(session) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
//
// Message class method handlers
@@ -46,7 +44,7 @@ MessageHandlerImpl::MessageHandlerImpl(Session& session)
void
MessageHandlerImpl::cancel(const string& destination )
{
- getSession().cancel(destination);
+ state.cancel(destination);
}
void
@@ -97,14 +95,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!destination.empty() && getSession().exists(destination))
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
//NB: am assuming pre-acquired = 0 as discussed on SIG list as is
//the previously expected behaviour
- getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -117,9 +115,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
- if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -148,14 +146,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
bool /*global*/ )
{
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void
@@ -166,7 +164,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().reject(i->getValue(), (++i)->getValue());
+ state.reject(i->getValue(), (++i)->getValue());
}
}
@@ -175,10 +173,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
if (unit == 0) {
//message
- getSession().addMessageCredit(destination, value);
+ state.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- getSession().addByteCredit(destination, value);
+ state.addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +188,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
{
if (mode == 0) {
//credit
- getSession().setCreditMode(destination);
+ state.setCreditMode(destination);
} else if (mode == 1) {
//window
- getSession().setWindowMode(destination);
+ state.setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -201,12 +199,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
void MessageHandlerImpl::flush(const std::string& destination)
{
- getSession().flush(destination);
+ state.flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- getSession().stop(destination);
+ state.stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,7 +216,7 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().acquire(i->getValue(), (++i)->getValue(), results);
+ state.acquire(i->getValue(), (++i)->getValue(), results);
}
results = results.condense();
@@ -232,7 +230,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().release(i->getValue(), (++i)->getValue());
+ state.release(i->getValue(), (++i)->getValue());
}
}