/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/SessionState.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include #include namespace qpid { namespace broker { using namespace framing; using sys::Mutex; using boost::intrusive_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::AbsTime; //using qpid::sys::Timer; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this), adapter(semanticState), asyncCommandCompleter(new AsyncCommandCompleter(this)) { addManagementObject(); attach(h); } void SessionState::addManagementObject() { if (GetManagementObject()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); if (agent != 0) { std::string name(getId().str()); std::string fullName(name); if (name.length() >= std::numeric_limits::max()) name.resize(std::numeric_limits::max()-1); mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session (agent, this, parent, name)); mgmtObject->set_fullName (fullName); mgmtObject->set_attached (0); mgmtObject->clr_expireTime(); agent->addObject(mgmtObject); } } } void SessionState::startTx() { if (mgmtObject) { mgmtObject->inc_TxnStarts(); } } void SessionState::commitTx() { if (mgmtObject) { mgmtObject->inc_TxnCommits(); mgmtObject->inc_TxnCount(); } } void SessionState::rollbackTx() { if (mgmtObject) { mgmtObject->inc_TxnRejects(); mgmtObject->inc_TxnCount(); } } SessionState::~SessionState() { if (mgmtObject != 0) mgmtObject->debugStats("destroying"); asyncCommandCompleter->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); } AMQP_ClientProxy& SessionState::getProxy() { assert(isAttached()); return handler->getProxy(); } uint16_t SessionState::getChannel() const { assert(isAttached()); return handler->getChannel(); } amqp_0_10::Connection& SessionState::getConnection() { assert(isAttached()); return handler->getConnection(); } bool SessionState::isLocal(const OwnershipToken* t) const { return isAttached() && &(handler->getConnection()) == t; } void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); asyncCommandCompleter->detached(); disableOutput(); handler = 0; if (mgmtObject != 0) mgmtObject->set_attached (0); } void SessionState::disableOutput() { semanticState.detached(); //prevents further activateOutput calls until reattached } void SessionState::attach(SessionHandler& h) { QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; if (mgmtObject != 0) { mgmtObject->set_attached (1); mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } asyncCommandCompleter->attached(); } ManagementObject::shared_ptr SessionState::GetManagementObject(void) const { return mgmtObject; } Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, Args& /*args*/, std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; switch (methodId) { case _qmf::Session::METHOD_DETACH : if (handler != 0) { handler->sendDetach(); } status = Manageable::STATUS_OK; break; case _qmf::Session::METHOD_CLOSE : /* if (handler != 0) { handler->getConnection().closeChannel(handler->getChannel()); } status = Manageable::STATUS_OK; break; */ case _qmf::Session::METHOD_SOLICITACK : case _qmf::Session::METHOD_RESETLIFESPAN : status = Manageable::STATUS_NOT_IMPLEMENTED; break; } return status; } void SessionState::handleCommand(framing::AMQMethodBody* method) { Invoker::Result result = invoke(adapter, *method); if (!result.wasHandled()) throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); if (currentCommand.isCompleteSync()) completeCommand( currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(), result.getResult()); } void SessionState::handleContent(AMQFrame& frame) { if (frame.getBof() && frame.getBos()) //start of frameset msgBuilder.start(currentCommand.getId()); intrusive_ptr msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset if (frame.getBof()) { //i.e this is a just a command frame, add a dummy header AMQFrame header((AMQHeaderBody())); header.setBof(false); header.setEof(false); msg->getFrames().append(header); } DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer()); if (broker.isTimestamping()) msg->setTimestamp(); msg->setPublisher(&(getConnection())); msg->computeExpiration(); IncompleteIngressMsgXfer xfer(this, msg); msg->getIngressCompletion().begin(); // This call should come before routing, because it calcs required credit. msgBuilder.end(); semanticState.route(deliverable.getMessage(), deliverable); msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } } void SessionState::sendAcceptAndCompletion() { if (!accepted.empty()) { getProxy().getMessage().accept(accepted); accepted.clear(); } sendCompletion(); } /** Invoked when the given command is finished being processed by all interested * parties (eg. it is done being enqueued to all queues, its credit has been * accounted for, etc). At this point the command is considered by this * receiver as 'completed' (as defined by AMQP 0_10) */ void SessionState::completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync, const std::string& result=std::string()) { bool callSendCompletion = false; receiverCompleted(id); if (requiresAccept) // will cause cmd's seq to appear in the next message.accept we send. accepted.add(id); if (!result.empty()) getProxy().getExecution().result(id, result); // Are there any outstanding Execution.Sync commands pending the // completion of this cmd? If so, complete them. while (!pendingExecutionSyncs.empty() && (receiverGetIncomplete().empty() || receiverGetIncomplete().front() >= pendingExecutionSyncs.front())) { const SequenceNumber syncId = pendingExecutionSyncs.front(); pendingExecutionSyncs.pop(); QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed."); if (receiverGetIncomplete().contains(syncId)) receiverCompleted(syncId); callSendCompletion = true; // likely peer is pending for this completion. } // if the sender has requested immediate notification of the completion... if (requiresSync || callSendCompletion) { sendAcceptAndCompletion(); } } void SessionState::handleIn(AMQFrame& frame) { //TODO: make command handling more uniform, regardless of whether //commands carry content. AMQMethodBody* m = frame.getMethod(); currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync()); if (m == 0 || m->isContentBearing()) { handleContent(frame); } else if (frame.getBof() && frame.getEof()) { handleCommand(frame.getMethod()); } else { throw InternalErrorException("Cannot handle multi-frame command segments yet"); } } void SessionState::handleOut(AMQFrame& frame) { assert(handler); handler->out(frame); } DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message, const std::string& destination, bool isRedelivered, uint64_t ttl, qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode, const qpid::types::Variant::Map& annotations, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode))); method.setEof(false); getProxy().getHandler().handle(method); message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations); message.sendContent(getProxy().getHandler(), maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. if (sync) { AMQP_ClientProxy::Execution& p(getProxy().getExecution()); Proxy::ScopedSync s(p); p.sync(); } return commandId; } void SessionState::sendCompletion() { handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); semanticState.completed(commands); } void SessionState::readyToSend() { QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); semanticState.attached(); } Broker& SessionState::getBroker() { return broker; } // Session resume is not fully implemented so it is useless to set a // non-0 timeout. void SessionState::setTimeout(uint32_t) { } // Current received command is an execution.sync command. // Complete this command only when all preceding commands have completed. // (called via the invoker() in handleCommand() above) bool SessionState::addPendingExecutionSync() { SequenceNumber id = currentCommand.getId(); if (addPendingExecutionSync(id)) { currentCommand.setCompleteSync(false); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id); return true; } return false; } bool SessionState::addPendingExecutionSync(SequenceNumber id) { if (receiverGetIncomplete().front() < id) { pendingExecutionSyncs.push(id); asyncCommandCompleter->flushPendingMessages(); return true; } return false; } /** factory for creating a reference-counted IncompleteIngressMsgXfer object * which will be attached to a message that will be completed asynchronously. */ boost::intrusive_ptr SessionState::IncompleteIngressMsgXfer::clone() { // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. // If the client is pending the message.transfer completion, flush now to force immediate write to journal. if (requiresSync) msg->flush(); else { // otherwise, we need to track this message in order to flush it if an execution.sync arrives // before it has been completed (see flushPendingMessages()) pending = true; completerContext->addPendingMessage(msg); } return boost::intrusive_ptr(new SessionState::IncompleteIngressMsgXfer(*this)); } /** Invoked by the asynchronous completer associated with a received * msg that is pending Completion. May be invoked by the IO thread * (sync == true), or some external thread (!sync). */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access * the scheduledCompleterContext, since *this has a shared pointer to it. * but not session! */ session = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync); } else { // this path runs directly from the ac->end() call in handleContent() above, // so *session is definately valid. if (session->isAttached()) { QPID_LOG(debug, ": receive completed for msg seq=" << id); session->completeCommand(id, requiresAccept, requiresSync); } } completerContext = 0; } /** Track an ingress message that is pending completion */ void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr msg) { qpid::sys::ScopedLock l(completerLock); std::pair > item(msg->getCommandId(), msg); bool unique = pendingMsgs.insert(item).second; if (!unique) { assert(false); } } /** pending message has completed */ void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) { qpid::sys::ScopedLock l(completerLock); pendingMsgs.erase(id); } /** done when an execution.sync arrives */ void SessionState::AsyncCommandCompleter::flushPendingMessages() { std::map > copy; { qpid::sys::ScopedLock l(completerLock); pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. } // drop lock, so it is safe to call "flush()" for (std::map >::iterator i = copy.begin(); i != copy.end(); ++i) { i->second->flush(); } } /** mark an ingress Message.Transfer command as completed. * This method must be thread safe - it may run on any thread. */ void SessionState::AsyncCommandCompleter::scheduleCommandCompletion( SequenceNumber cmd, bool requiresAccept, bool requiresSync) { qpid::sys::ScopedLock l(completerLock); if (session && isAttached) { CommandInfo info(cmd, requiresAccept, requiresSync); completedCmds.push_back(info); if (completedCmds.size() == 1) { session->getConnection().requestIOProcessing( boost::bind(&AsyncCommandCompleter::completeCommands, session->asyncCommandCompleter)); } } } void SessionState::AsyncCommandCompleter::schedule(boost::function f) { if (session && isAttached) session->getConnection().requestIOProcessing(f); } /** Cause the session to complete all completed commands. * Executes on the IO thread. */ void SessionState::AsyncCommandCompleter::completeCommands() { qpid::sys::ScopedLock l(completerLock); // when session is destroyed, it clears the session pointer via cancel(). if (session && session->isAttached()) { for (std::vector::iterator cmd = completedCmds.begin(); cmd != completedCmds.end(); ++cmd) { session->completeCommand( cmd->cmd, cmd->requiresAccept, cmd->requiresSync); } } completedCmds.clear(); } /** cancel any pending calls to scheduleComplete */ void SessionState::AsyncCommandCompleter::cancel() { qpid::sys::ScopedLock l(completerLock); session = 0; } /** inform the completer that the session has attached, * allows command completion scheduling from any thread */ void SessionState::AsyncCommandCompleter::attached() { qpid::sys::ScopedLock l(completerLock); isAttached = true; } /** inform the completer that the session has detached, * disables command completion scheduling from any thread */ void SessionState::AsyncCommandCompleter::detached() { qpid::sys::ScopedLock l(completerLock); isAttached = false; } }} // namespace qpid::broker