summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp128
1 files changed, 113 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index b6c59cfb3b..573a567da6 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -19,12 +19,16 @@
*
*/
#include "SessionState.h"
-#include "SessionManager.h"
-#include "SessionContext.h"
-#include "ConnectionState.h"
#include "Broker.h"
+#include "ConnectionState.h"
+#include "MessageDelivery.h"
#include "SemanticHandler.h"
+#include "SessionManager.h"
+#include "SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
@@ -37,17 +41,17 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
+ semanticState(*this, *this),
+ adapter(semanticState),
+ msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+ ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.addOutputTask(&semanticState);
Manageable* parent = broker.GetVhostObject ();
@@ -76,7 +80,7 @@ SessionState::~SessionState() {
mgmtObject->resourceDestroy ();
}
-SessionContext* SessionState::getHandler() {
+SessionHandler* SessionState::getHandler() {
return handler;
}
@@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() {
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
+ handler = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
}
}
-void SessionState::attach(SessionContext& h) {
+void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
- out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
@@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ h.getConnection().outputTasks.addOutputTask(&semanticState);
}
void SessionState::activateOutput()
@@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
+void SessionState::handleCommand(framing::AMQMethodBody* method)
+{
+ SequenceNumber id = incoming.next();
+ Invoker::Result invocation = invoke(adapter, *method);
+ incoming.complete(id);
+
+ if (!invocation.wasHandled()) {
+ throw NotImplementedException("Not implemented");
+ } else if (invocation.hasResult()) {
+ getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ }
+ if (method->isSync()) {
+ incoming.sync(id);
+ sendCompletion();
+ }
+ //TODO: if window gets too large send unsolicited completion
+}
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(incoming.next());
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&getConnection());
+ semanticState.handle(msg);
+ msgBuilder.end();
+ incoming.track(msg);
+ if (msg->getFrames().getMethod()->isSync()) {
+ incoming.sync(msg->getCommandId());
+ sendCompletion();
+ }
+ }
+}
+
+void SessionState::handle(AMQFrame& frame)
+{
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assmblies are non-content bearing and all content-bearing
+ //assmeblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod());
+ } else {
+ handleContent(frame);
+ }
+
+}
+
+DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+{
+ uint32_t maxFrameSize = getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
+ return outgoing.hwm;
+}
+
+void SessionState::sendCompletion()
+{
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
+ getProxy().getExecution().complete(mark.getValue(), range);
+}
+
+void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ semanticState.ackCumulative(mark.getValue());
+ }
+ range.processRanges(ackOp);
+}
+
+void SessionState::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
+
+void SessionState::sync()
+{
+ incoming.sync();
+ sendCompletion();
+}
+
+void SessionState::noop()
+{
+ incoming.noop();
+}
+
}} // namespace qpid::broker