diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f8d76c3b5f..0bb813ebfd 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,12 +20,12 @@ */ #include "SemanticHandler.h" -#include "Session.h" +#include "SemanticState.h" #include "SessionHandler.h" +#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "Connection.h" -#include "Session.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" #include "qpid/framing/InvocationVisitor.h" @@ -36,7 +36,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {} +SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -79,13 +79,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - getSession().ackCumulative(mark.getValue()); + state.ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -95,9 +95,9 @@ void SemanticHandler::sendCompletion() SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - assert(getSessionHandler()); - getProxy().getExecution().complete(mark.getValue(), range); + session.getProxy().getExecution().complete(mark.getValue(), range); } + void SemanticHandler::flush() { incoming.flush(); @@ -122,7 +122,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { SequenceNumber id = incoming.next(); - BrokerAdapter adapter(getSession()); + BrokerAdapter adapter(state); InvocationVisitor v(&adapter); method->accept(v); incoming.complete(id); @@ -130,7 +130,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - getProxy().getExecution().result(id.getValue(), v.getResult()); + session.getProxy().getExecution().result(id.getValue(), v.getResult()); } //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -152,8 +152,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&getConnection()); - getSession().handle(msg); + msg->setPublisher(&session.getConnection()); + state.handle(msg); msgBuilder.end(); incoming.track(msg); //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } @@ -163,13 +163,17 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax()); + MessageDelivery::deliver( + msg, session.getHandler().out, + ++outgoing.hwm, token, + session.getConnection().getFrameMax()); return outgoing.hwm; } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax()); + MessageDelivery::deliver(msg, session.getHandler().out, tag, token, + session.getConnection().getFrameMax()); } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) |