summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
committerAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
commit0333573627c831142aa251bfb1cabdb1e2bf438e (patch)
tree953bf8c624374c57953aa3f2888254d175609d9a /cpp/src/qpid/broker/SessionState.cpp
parent96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff)
downloadqpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp124
1 files changed, 51 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2ef1ed2de4..c851162046 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -32,6 +32,7 @@
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
@@ -45,59 +46,46 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name)
- : framing::SessionState(ack, timeout_ > 0), nextOut(0),
- factory(f), handler(h), id(true), timeout(timeout_),
- broker(h->getConnection().broker),
- version(h->getConnection().getVersion()),
- ignoring(false), name(_name),
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ : qpid::SessionState(id, config),
+ broker(b), handler(&h),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
- getConnection().outputTasks.addOutputTask(&semanticState);
-
Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
+ if (parent != 0) {
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
+ if (agent.get () != 0) {
mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, name));
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h->getChannel());
- mgmtObject->set_detachedLifespan (getTimeout());
+ (new management::Session (this, parent, getId().getName()));
+ mgmtObject->set_attached (0);
agent->addObject (mgmtObject);
}
}
+ attach(h);
}
SessionState::~SessionState() {
// Remove ID from active session list.
- if (factory)
- factory->erase(getId());
+ // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
+ // they don't belong in the manager. For now rely on uniqueness of UUIDs.
+ //
+ broker.getSessionManager().forget(getId());
if (mgmtObject.get () != 0)
mgmtObject->resourceDestroy ();
}
-SessionHandler* SessionState::getHandler() {
- return handler;
-}
-
AMQP_ClientProxy& SessionState::getProxy() {
assert(isAttached());
- return getHandler()->getProxy();
+ return handler->getProxy();
}
ConnectionState& SessionState::getConnection() {
assert(isAttached());
- return getHandler()->getConnection();
+ return handler->getConnection();
}
bool SessionState::isLocal(const ConnectionToken* t) const
@@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticState);
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": detached on broker.");
+ getConnection().outputTasks.removeOutputTask(&semanticState);
handler = 0;
if (mgmtObject.get() != 0)
- {
mgmtObject->set_attached (0);
}
-}
void SessionState::attach(SessionHandler& h) {
- {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
if (mgmtObject.get() != 0)
{
@@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticState);
-}
-void SessionState::activateOutput()
-{
+void SessionState::activateOutput() {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
- if (isAttached()) {
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
-}
//This class could be used as the callback for queue notifications
//if not attached, it can simply ignore the callback, else pass it
//on to the connection
@@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->requestDetach();
+ handler->sendDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
-{
- id = nextIn++;
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- completed.add(id);
-
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
- nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendCompletion();
}
- //TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
+void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (frame.getBof() && frame.getBos()) {//start of frameset
- id = nextIn++;
+ if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- msg = msgBuilder.getMessage();
- } else {
- id = msg->getCommandId();
- }
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
- completed.add(msg->getCommandId());
- if (msg->requiresAccept()) {
- nextOut++;//accept is a command, so the counter must be incremented
+ receiverCompleted(msg->getCommandId());
+ if (msg->requiresAccept())
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-}
void SessionState::handle(AMQFrame& frame)
{
- if (ignoring) return;
- received(frame);
-
- SequenceNumber commandId;
+ SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
//commands carry content.
@@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- timeout = 0;
ignoring = true;
- handler->requestDetach();
+ handler->sendDetach();
}
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
- MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize);
- return nextOut++;
+ assert(senderGetCommandPoint().offset == 0);
+ SequenceNumber commandId = senderGetCommandPoint().command;
+ MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize);
+ assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ return commandId;
}
-void SessionState::sendCompletion()
-{
- handler->sendCompletion();
+void SessionState::sendCompletion() { handler->sendCompletion(); }
+
+void SessionState::senderCompleted(const SequenceSet& commands) {
+ qpid::SessionState::senderCompleted(commands);
+ commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2));
}
-void SessionState::complete(const SequenceSet& commands)
-{
- knownCompleted.add(commands);
- commands.for_each(ackOp);
+void SessionState::readyToSend() {
+ QPID_LOG(debug, getId() << ": ready to send, activating output.");
+ assert(handler);
+ sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
+ tasks.addOutputTask(&semanticState);
+ tasks.activateOutput();
}
+Broker& SessionState::getBroker() { return broker; }
}} // namespace qpid::broker