diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp new file mode 100644 index 0000000000..6836794622 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -0,0 +1,527 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/SessionState.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/SessionManager.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" + +#include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { + +using namespace framing; +using sys::Mutex; +using boost::intrusive_ptr; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::AbsTime; +//using qpid::sys::Timer; +namespace _qmf = qmf::org::apache::qpid::broker; + +SessionState::SessionState( + Broker& b, SessionHandler& h, const SessionId& id, + const SessionState::Configuration& config) + : qpid::SessionState(id, config), + broker(b), handler(&h), + semanticState(*this), + adapter(semanticState), + asyncCommandCompleter(new AsyncCommandCompleter(this)) +{ + addManagementObject(); + attach(h); +} + +void SessionState::addManagementObject() { + if (GetManagementObject()) return; // Already added. + Manageable* parent = broker.GetVhostObject (); + if (parent != 0) { + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent != 0) { + std::string name(getId().str()); + std::string fullName(name); + if (name.length() >= std::numeric_limits<uint8_t>::max()) + name.resize(std::numeric_limits<uint8_t>::max()-1); + mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session (agent, this, parent, name)); + mgmtObject->set_fullName (fullName); + mgmtObject->set_attached (0); + mgmtObject->clr_expireTime(); + agent->addObject(mgmtObject); + } + } +} + +void SessionState::startTx() { + if (mgmtObject) { mgmtObject->inc_TxnStarts(); } +} + +void SessionState::commitTx() { + if (mgmtObject) { + mgmtObject->inc_TxnCommits(); + mgmtObject->inc_TxnCount(); + } +} + +void SessionState::rollbackTx() { + if (mgmtObject) { + mgmtObject->inc_TxnRejects(); + mgmtObject->inc_TxnCount(); + } +} + +SessionState::~SessionState() { + if (mgmtObject != 0) + mgmtObject->debugStats("destroying"); + asyncCommandCompleter->cancel(); + semanticState.closed(); + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} + +AMQP_ClientProxy& SessionState::getProxy() { + assert(isAttached()); + return handler->getProxy(); +} + +uint16_t SessionState::getChannel() const { + assert(isAttached()); + return handler->getChannel(); +} + +amqp_0_10::Connection& SessionState::getConnection() { + assert(isAttached()); + return handler->getConnection(); +} + +bool SessionState::isLocal(const OwnershipToken* t) const +{ + return isAttached() && &(handler->getConnection()) == t; +} + +void SessionState::detach() { + QPID_LOG(debug, getId() << ": detached on broker."); + asyncCommandCompleter->detached(); + disableOutput(); + handler = 0; + if (mgmtObject != 0) + mgmtObject->set_attached (0); +} + +void SessionState::disableOutput() +{ + semanticState.detached(); //prevents further activateOutput calls until reattached +} + +void SessionState::attach(SessionHandler& h) { + QPID_LOG(debug, getId() << ": attached on broker."); + handler = &h; + if (mgmtObject != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } + asyncCommandCompleter->attached(); +} + +ManagementObject::shared_ptr SessionState::GetManagementObject(void) const +{ + return mgmtObject; +} + +Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/, + std::string& /*text*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case _qmf::Session::METHOD_DETACH : + if (handler != 0) { + handler->sendDetach(); + } + status = Manageable::STATUS_OK; + break; + + case _qmf::Session::METHOD_CLOSE : + /* + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + */ + + case _qmf::Session::METHOD_SOLICITACK : + case _qmf::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + +void SessionState::handleCommand(framing::AMQMethodBody* method) { + Invoker::Result result = invoke(adapter, *method); + if (!result.wasHandled()) + throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); + if (currentCommand.isCompleteSync()) + completeCommand( + currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(), + result.getResult()); +} + + +void SessionState::handleContent(AMQFrame& frame) +{ + if (frame.getBof() && frame.getBos()) //start of frameset + msgBuilder.start(currentCommand.getId()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage()); + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset + if (frame.getBof()) { + //i.e this is a just a command frame, add a dummy header + AMQFrame header((AMQHeaderBody())); + header.setBof(false); + header.setEof(false); + msg->getFrames().append(header); + } + DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer()); + if (broker.isTimestamping()) + msg->setTimestamp(); + msg->setPublisher(&(getConnection())); + msg->computeExpiration(); + + + IncompleteIngressMsgXfer xfer(this, msg); + msg->getIngressCompletion().begin(); + // This call should come before routing, because it calcs required credit. + msgBuilder.end(); + semanticState.route(deliverable.getMessage(), deliverable); + msg->getIngressCompletion().end(xfer); // allows msg to complete xfer + } +} + +void SessionState::sendAcceptAndCompletion() +{ + if (!accepted.empty()) { + getProxy().getMessage().accept(accepted); + accepted.clear(); + } + sendCompletion(); +} + +/** Invoked when the given command is finished being processed by all interested + * parties (eg. it is done being enqueued to all queues, its credit has been + * accounted for, etc). At this point the command is considered by this + * receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeCommand(SequenceNumber id, + bool requiresAccept, + bool requiresSync, + const std::string& result=std::string()) +{ + bool callSendCompletion = false; + receiverCompleted(id); + if (requiresAccept) + // will cause cmd's seq to appear in the next message.accept we send. + accepted.add(id); + + if (!result.empty()) + getProxy().getExecution().result(id, result); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this cmd? If so, complete them. + while (!pendingExecutionSyncs.empty() && + (receiverGetIncomplete().empty() || + receiverGetIncomplete().front() >= pendingExecutionSyncs.front())) + { + const SequenceNumber syncId = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed."); + if (receiverGetIncomplete().contains(syncId)) + receiverCompleted(syncId); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (requiresSync || callSendCompletion) { + sendAcceptAndCompletion(); + } +} + +void SessionState::handleIn(AMQFrame& frame) { + //TODO: make command handling more uniform, regardless of whether + //commands carry content. + AMQMethodBody* m = frame.getMethod(); + currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync()); + + if (m == 0 || m->isContentBearing()) { + handleContent(frame); + } else if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + throw InternalErrorException("Cannot handle multi-frame command segments yet"); + } +} + +void SessionState::handleOut(AMQFrame& frame) { + assert(handler); + handler->out(frame); +} + +DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message, + const std::string& destination, bool isRedelivered, uint64_t ttl, + qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode, + const qpid::types::Variant::Map& annotations, bool sync) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + assert(senderGetCommandPoint().offset == 0); + SequenceNumber commandId = senderGetCommandPoint().command; + + framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode))); + method.setEof(false); + getProxy().getHandler().handle(method); + message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations); + message.sendContent(getProxy().getHandler(), maxFrameSize); + + assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + if (sync) { + AMQP_ClientProxy::Execution& p(getProxy().getExecution()); + Proxy::ScopedSync s(p); + p.sync(); + } + return commandId; +} + +void SessionState::sendCompletion() { + handler->sendCompletion(); +} + +void SessionState::senderCompleted(const SequenceSet& commands) { + qpid::SessionState::senderCompleted(commands); + semanticState.completed(commands); +} + +void SessionState::readyToSend() { + QPID_LOG(debug, getId() << ": ready to send, activating output."); + assert(handler); + semanticState.attached(); +} + +Broker& SessionState::getBroker() { return broker; } + +// Session resume is not fully implemented so it is useless to set a +// non-0 timeout. +void SessionState::setTimeout(uint32_t) { } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +bool SessionState::addPendingExecutionSync() { + SequenceNumber id = currentCommand.getId(); + if (addPendingExecutionSync(id)) { + currentCommand.setCompleteSync(false); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id); + return true; + } + return false; +} + +bool SessionState::addPendingExecutionSync(SequenceNumber id) +{ + if (receiverGetIncomplete().front() < id) { + pendingExecutionSyncs.push(id); + asyncCommandCompleter->flushPendingMessages(); + return true; + } + return false; +} + +/** factory for creating a reference-counted IncompleteIngressMsgXfer object + * which will be attached to a message that will be completed asynchronously. + */ +boost::intrusive_ptr<AsyncCompletion::Callback> +SessionState::IncompleteIngressMsgXfer::clone() +{ + // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. + if (requiresSync) + msg->flush(); + else { + // otherwise, we need to track this message in order to flush it if an execution.sync arrives + // before it has been completed (see flushPendingMessages()) + pending = true; + completerContext->addPendingMessage(msg); + } + + return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this)); +} + + +/** Invoked by the asynchronous completer associated with a received + * msg that is pending Completion. May be invoked by the IO thread + * (sync == true), or some external thread (!sync). + */ +void SessionState::IncompleteIngressMsgXfer::completed(bool sync) +{ + if (pending) completerContext->deletePendingMessage(id); + if (!sync) { + /** note well: this path may execute in any thread. It is safe to access + * the scheduledCompleterContext, since *this has a shared pointer to it. + * but not session! + */ + session = 0; + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); + completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync); + } else { + // this path runs directly from the ac->end() call in handleContent() above, + // so *session is definately valid. + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + session->completeCommand(id, requiresAccept, requiresSync); + } + } + completerContext = 0; +} + + +/** Track an ingress message that is pending completion */ +void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg); + bool unique = pendingMsgs.insert(item).second; + if (!unique) { + assert(false); + } +} + + +/** pending message has completed */ +void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.erase(id); +} + + +/** done when an execution.sync arrives */ +void SessionState::AsyncCommandCompleter::flushPendingMessages() +{ + std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + } + // drop lock, so it is safe to call "flush()" + for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin(); + i != copy.end(); ++i) { + i->second->flush(); + } +} + + +/** mark an ingress Message.Transfer command as completed. + * This method must be thread safe - it may run on any thread. + */ +void SessionState::AsyncCommandCompleter::scheduleCommandCompletion( + SequenceNumber cmd, + bool requiresAccept, + bool requiresSync) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + if (session && isAttached) { + CommandInfo info(cmd, requiresAccept, requiresSync); + completedCmds.push_back(info); + if (completedCmds.size() == 1) { + session->getConnection().requestIOProcessing( + boost::bind(&AsyncCommandCompleter::completeCommands, + session->asyncCommandCompleter)); + } + } +} + +void SessionState::AsyncCommandCompleter::schedule(boost::function<void()> f) { + if (session && isAttached) session->getConnection().requestIOProcessing(f); +} + +/** Cause the session to complete all completed commands. + * Executes on the IO thread. + */ +void SessionState::AsyncCommandCompleter::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (session && session->isAttached()) { + for (std::vector<CommandInfo>::iterator cmd = completedCmds.begin(); + cmd != completedCmds.end(); ++cmd) { + session->completeCommand( + cmd->cmd, cmd->requiresAccept, cmd->requiresSync); + } + } + completedCmds.clear(); +} + + +/** cancel any pending calls to scheduleComplete */ +void SessionState::AsyncCommandCompleter::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + session = 0; +} + + +/** inform the completer that the session has attached, + * allows command completion scheduling from any thread */ +void SessionState::AsyncCommandCompleter::attached() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + isAttached = true; +} + + +/** inform the completer that the session has detached, + * disables command completion scheduling from any thread */ +void SessionState::AsyncCommandCompleter::detached() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + isAttached = false; +} + +}} // namespace qpid::broker |