summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp63
1 files changed, 39 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dada7567f9..d7089424a5 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -53,7 +53,11 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
+ inLastHandler(*this),
+ outLastHandler(*this),
+ inChain(inLastHandler),
+ outChain(outLastHandler)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -102,20 +106,20 @@ void SessionState::detach() {
handler = 0;
if (mgmtObject.get() != 0)
mgmtObject->set_attached (0);
- }
+}
void SessionState::attach(SessionHandler& h) {
// activateOutput can be called in a different thread, lock to protect attached status
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
QPID_LOG(debug, getId() << ": attached on broker.");
- handler = &h;
- if (mgmtObject.get() != 0)
- {
- mgmtObject->set_attached (1);
- mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
- }
+ handler = &h;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
}
+}
void SessionState::activateOutput() {
// activateOutput can be called in a different thread, lock to protect attached status
@@ -137,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
switch (methodId)
{
- case management::Session::METHOD_DETACH :
+ case management::Session::METHOD_DETACH :
if (handler != 0)
{
handler->sendDetach();
@@ -145,18 +149,18 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
- case management::Session::METHOD_CLOSE :
+ case management::Session::METHOD_CLOSE :
/*
- if (handler != 0)
- {
- handler->getConnection().closeChannel(handler->getChannel());
- }
- status = Manageable::STATUS_OK;
- break;
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
*/
- case management::Session::METHOD_SOLICITACK :
- case management::Session::METHOD_RESETLIFESPAN :
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
@@ -218,10 +222,12 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
receiverCompleted(msg->getCommandId());
if (msg->requiresAccept())
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
- }
+}
-void SessionState::handle(AMQFrame& frame)
-{
+void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); }
+void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); }
+
+void SessionState::handleInLast(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
@@ -252,6 +258,11 @@ void SessionState::handle(AMQFrame& frame)
}
}
+void SessionState::handleOutLast(AMQFrame& frame) {
+ assert(handler);
+ handler->out(frame);
+}
+
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
@@ -267,7 +278,7 @@ void SessionState::sendCompletion() { handler->sendCompletion(); }
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
- semanticState.completed(i->first(), i->last());
+ semanticState.completed(i->first(), i->last());
}
void SessionState::readyToSend() {
@@ -280,4 +291,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; }
+
+framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; }
+
}} // namespace qpid::broker