summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp30
1 files changed, 17 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f8d76c3b5f..0bb813ebfd 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,12 +20,12 @@
*/
#include "SemanticHandler.h"
-#include "Session.h"
+#include "SemanticState.h"
#include "SessionHandler.h"
+#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "Connection.h"
-#include "Session.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
#include "qpid/framing/InvocationVisitor.h"
@@ -36,7 +36,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
+SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {}
void SemanticHandler::handle(framing::AMQFrame& frame)
{
@@ -79,13 +79,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- getSession().ackCumulative(mark.getValue());
+ state.ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -95,9 +95,9 @@ void SemanticHandler::sendCompletion()
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- assert(getSessionHandler());
- getProxy().getExecution().complete(mark.getValue(), range);
+ session.getProxy().getExecution().complete(mark.getValue(), range);
}
+
void SemanticHandler::flush()
{
incoming.flush();
@@ -122,7 +122,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
SequenceNumber id = incoming.next();
- BrokerAdapter adapter(getSession());
+ BrokerAdapter adapter(state);
InvocationVisitor v(&adapter);
method->accept(v);
incoming.complete(id);
@@ -130,7 +130,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- getProxy().getExecution().result(id.getValue(), v.getResult());
+ session.getProxy().getExecution().result(id.getValue(), v.getResult());
}
//TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
//TODO: if window gets too large send unsolicited completion
@@ -152,8 +152,8 @@ void SemanticHandler::handleContent(AMQFrame& frame)
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&getConnection());
- getSession().handle(msg);
+ msg->setPublisher(&session.getConnection());
+ state.handle(msg);
msgBuilder.end();
incoming.track(msg);
//TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
@@ -163,13 +163,17 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(
+ msg, session.getHandler().out,
+ ++outgoing.hwm, token,
+ session.getConnection().getFrameMax());
return outgoing.hwm;
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
+ session.getConnection().getFrameMax());
}
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)