diff options
author | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
commit | 0333573627c831142aa251bfb1cabdb1e2bf438e (patch) | |
tree | 953bf8c624374c57953aa3f2888254d175609d9a /cpp/src/qpid/broker/SessionState.cpp | |
parent | 96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff) | |
download | qpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz |
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 124 |
1 files changed, 51 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2ef1ed2de4..c851162046 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -32,6 +32,7 @@ #include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace broker { @@ -45,59 +46,46 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name) - : framing::SessionState(ack, timeout_ > 0), nextOut(0), - factory(f), handler(h), id(true), timeout(timeout_), - broker(h->getConnection().broker), - version(h->getConnection().getVersion()), - ignoring(false), name(_name), + Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + : qpid::SessionState(id, config), + broker(b), handler(&h), + ignoring(false), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { - getConnection().outputTasks.addOutputTask(&semanticState); - Manageable* parent = broker.GetVhostObject (); - - if (parent != 0) - { + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - - if (agent.get () != 0) - { + if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, name)); - mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h->getChannel()); - mgmtObject->set_detachedLifespan (getTimeout()); + (new management::Session (this, parent, getId().getName())); + mgmtObject->set_attached (0); agent->addObject (mgmtObject); } } + attach(h); } SessionState::~SessionState() { // Remove ID from active session list. - if (factory) - factory->erase(getId()); + // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, + // they don't belong in the manager. For now rely on uniqueness of UUIDs. + // + broker.getSessionManager().forget(getId()); if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } -SessionHandler* SessionState::getHandler() { - return handler; -} - AMQP_ClientProxy& SessionState::getProxy() { assert(isAttached()); - return getHandler()->getProxy(); + return handler->getProxy(); } ConnectionState& SessionState::getConnection() { assert(isAttached()); - return getHandler()->getConnection(); + return handler->getConnection(); } bool SessionState::isLocal(const ConnectionToken* t) const @@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticState); + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": detached on broker."); + getConnection().outputTasks.removeOutputTask(&semanticState); handler = 0; if (mgmtObject.get() != 0) - { mgmtObject->set_attached (0); } -} void SessionState::attach(SessionHandler& h) { - { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; if (mgmtObject.get() != 0) { @@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticState); -} -void SessionState::activateOutput() -{ +void SessionState::activateOutput() { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); - if (isAttached()) { + if (isAttached()) getConnection().outputTasks.activateOutput(); } -} //This class could be used as the callback for queue notifications //if not attached, it can simply ignore the callback, else pass it //on to the connection @@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->requestDetach(); + handler->sendDetach(); } status = Manageable::STATUS_OK; break; @@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) -{ - id = nextIn++; +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { Invoker::Result invocation = invoke(adapter, *method); - completed.add(id); - + receiverCompleted(id); if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { - nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution().result(id, invocation.getResult()); } if (method->isSync()) { incomplete.process(enqueuedOp, true); sendCompletion(); } - //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) +void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { - intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (frame.getBof() && frame.getBos()) {//start of frameset - id = nextIn++; + if (frame.getBof() && frame.getBos()) //start of frameset msgBuilder.start(id); - msg = msgBuilder.getMessage(); - } else { - id = msg->getCommandId(); - } + intrusive_ptr<Message> msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset if (frame.getBof()) { @@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { - completed.add(msg->getCommandId()); - if (msg->requiresAccept()) { - nextOut++;//accept is a command, so the counter must be incremented + receiverCompleted(msg->getCommandId()); + if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -} void SessionState::handle(AMQFrame& frame) { - if (ignoring) return; - received(frame); - - SequenceNumber commandId; + SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether //commands carry content. @@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - timeout = 0; ignoring = true; - handler->requestDetach(); + handler->sendDetach(); } } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); - MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize); - return nextOut++; + assert(senderGetCommandPoint().offset == 0); + SequenceNumber commandId = senderGetCommandPoint().command; + MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize); + assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + return commandId; } -void SessionState::sendCompletion() -{ - handler->sendCompletion(); +void SessionState::sendCompletion() { handler->sendCompletion(); } + +void SessionState::senderCompleted(const SequenceSet& commands) { + qpid::SessionState::senderCompleted(commands); + commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2)); } -void SessionState::complete(const SequenceSet& commands) -{ - knownCompleted.add(commands); - commands.for_each(ackOp); +void SessionState::readyToSend() { + QPID_LOG(debug, getId() << ": ready to send, activating output."); + assert(handler); + sys::AggregateOutput& tasks = handler->getConnection().outputTasks; + tasks.addOutputTask(&semanticState); + tasks.activateOutput(); } +Broker& SessionState::getBroker() { return broker; } }} // namespace qpid::broker |