diff options
author | Alan Conway <aconway@apache.org> | 2007-09-21 18:26:37 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-09-21 18:26:37 +0000 |
commit | 2f6d6ad7efd788b71204af67dff51b6233881e2e (patch) | |
tree | a3d123bc112d12dfcef341a312f418624c98e342 /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
parent | 3b80f903b6174b4346d7d7b537d783f628fe28d6 (diff) | |
download | qpid-python-2f6d6ad7efd788b71204af67dff51b6233881e2e.tar.gz |
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains.
broker::SemanticState: session state for the SemanticHandler.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index a31ac78aa4..3d197e185d 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -18,7 +18,6 @@ #include "qpid/QpidError.h" #include "MessageHandlerImpl.h" -#include "Session.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" @@ -36,8 +35,7 @@ namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Session& session) - : HandlerImpl(session) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} // // Message class method handlers @@ -46,7 +44,7 @@ MessageHandlerImpl::MessageHandlerImpl(Session& session) void MessageHandlerImpl::cancel(const string& destination ) { - getSession().cancel(destination); + state.cancel(destination); } void @@ -97,14 +95,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!destination.empty() && getSession().exists(destination)) + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; //NB: am assuming pre-acquired = 0 as discussed on SIG list as is //the previously expected behaviour - getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -117,9 +115,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); - if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -148,14 +146,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void @@ -166,7 +164,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().reject(i->getValue(), (++i)->getValue()); + state.reject(i->getValue(), (++i)->getValue()); } } @@ -175,10 +173,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - getSession().addMessageCredit(destination, value); + state.addMessageCredit(destination, value); } else if (unit == 1) { //bytes - getSession().addByteCredit(destination, value); + state.addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -190,10 +188,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - getSession().setCreditMode(destination); + state.setCreditMode(destination); } else if (mode == 1) { //window - getSession().setWindowMode(destination); + state.setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -201,12 +199,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - getSession().flush(destination); + state.flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - getSession().stop(destination); + state.stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) @@ -218,7 +216,7 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().acquire(i->getValue(), (++i)->getValue(), results); + state.acquire(i->getValue(), (++i)->getValue(), results); } results = results.condense(); @@ -232,7 +230,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers) } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().release(i->getValue(), (++i)->getValue()); + state.release(i->getValue(), (++i)->getValue()); } } |