summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp527
1 files changed, 527 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
new file mode 100644
index 0000000000..6836794622
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -0,0 +1,527 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/SessionManager.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+
+#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+using sys::Mutex;
+using boost::intrusive_ptr;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::AbsTime;
+//using qpid::sys::Timer;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+SessionState::SessionState(
+ Broker& b, SessionHandler& h, const SessionId& id,
+ const SessionState::Configuration& config)
+ : qpid::SessionState(id, config),
+ broker(b), handler(&h),
+ semanticState(*this),
+ adapter(semanticState),
+ asyncCommandCompleter(new AsyncCommandCompleter(this))
+{
+ addManagementObject();
+ attach(h);
+}
+
+void SessionState::addManagementObject() {
+ if (GetManagementObject()) return; // Already added.
+ Manageable* parent = broker.GetVhostObject ();
+ if (parent != 0) {
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent != 0) {
+ std::string name(getId().str());
+ std::string fullName(name);
+ if (name.length() >= std::numeric_limits<uint8_t>::max())
+ name.resize(std::numeric_limits<uint8_t>::max()-1);
+ mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session (agent, this, parent, name));
+ mgmtObject->set_fullName (fullName);
+ mgmtObject->set_attached (0);
+ mgmtObject->clr_expireTime();
+ agent->addObject(mgmtObject);
+ }
+ }
+}
+
+void SessionState::startTx() {
+ if (mgmtObject) { mgmtObject->inc_TxnStarts(); }
+}
+
+void SessionState::commitTx() {
+ if (mgmtObject) {
+ mgmtObject->inc_TxnCommits();
+ mgmtObject->inc_TxnCount();
+ }
+}
+
+void SessionState::rollbackTx() {
+ if (mgmtObject) {
+ mgmtObject->inc_TxnRejects();
+ mgmtObject->inc_TxnCount();
+ }
+}
+
+SessionState::~SessionState() {
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
+ asyncCommandCompleter->cancel();
+ semanticState.closed();
+ if (mgmtObject != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+AMQP_ClientProxy& SessionState::getProxy() {
+ assert(isAttached());
+ return handler->getProxy();
+}
+
+uint16_t SessionState::getChannel() const {
+ assert(isAttached());
+ return handler->getChannel();
+}
+
+amqp_0_10::Connection& SessionState::getConnection() {
+ assert(isAttached());
+ return handler->getConnection();
+}
+
+bool SessionState::isLocal(const OwnershipToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
+void SessionState::detach() {
+ QPID_LOG(debug, getId() << ": detached on broker.");
+ asyncCommandCompleter->detached();
+ disableOutput();
+ handler = 0;
+ if (mgmtObject != 0)
+ mgmtObject->set_attached (0);
+}
+
+void SessionState::disableOutput()
+{
+ semanticState.detached(); //prevents further activateOutput calls until reattached
+}
+
+void SessionState::attach(SessionHandler& h) {
+ QPID_LOG(debug, getId() << ": attached on broker.");
+ handler = &h;
+ if (mgmtObject != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
+ asyncCommandCompleter->attached();
+}
+
+ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
+{
+ return mgmtObject;
+}
+
+Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/,
+ std::string& /*text*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case _qmf::Session::METHOD_DETACH :
+ if (handler != 0) {
+ handler->sendDetach();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case _qmf::Session::METHOD_CLOSE :
+ /*
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+ */
+
+ case _qmf::Session::METHOD_SOLICITACK :
+ case _qmf::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+void SessionState::handleCommand(framing::AMQMethodBody* method) {
+ Invoker::Result result = invoke(adapter, *method);
+ if (!result.wasHandled())
+ throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
+ if (currentCommand.isCompleteSync())
+ completeCommand(
+ currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(),
+ result.getResult());
+}
+
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+ if (frame.getBof() && frame.getBos()) //start of frameset
+ msgBuilder.start(currentCommand.getId());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
+ msgBuilder.handle(frame);
+ if (frame.getEof() && frame.getEos()) {//end of frameset
+ if (frame.getBof()) {
+ //i.e this is a just a command frame, add a dummy header
+ AMQFrame header((AMQHeaderBody()));
+ header.setBof(false);
+ header.setEof(false);
+ msg->getFrames().append(header);
+ }
+ DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
+ if (broker.isTimestamping())
+ msg->setTimestamp();
+ msg->setPublisher(&(getConnection()));
+ msg->computeExpiration();
+
+
+ IncompleteIngressMsgXfer xfer(this, msg);
+ msg->getIngressCompletion().begin();
+ // This call should come before routing, because it calcs required credit.
+ msgBuilder.end();
+ semanticState.route(deliverable.getMessage(), deliverable);
+ msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
+ }
+}
+
+void SessionState::sendAcceptAndCompletion()
+{
+ if (!accepted.empty()) {
+ getProxy().getMessage().accept(accepted);
+ accepted.clear();
+ }
+ sendCompletion();
+}
+
+/** Invoked when the given command is finished being processed by all interested
+ * parties (eg. it is done being enqueued to all queues, its credit has been
+ * accounted for, etc). At this point the command is considered by this
+ * receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeCommand(SequenceNumber id,
+ bool requiresAccept,
+ bool requiresSync,
+ const std::string& result=std::string())
+{
+ bool callSendCompletion = false;
+ receiverCompleted(id);
+ if (requiresAccept)
+ // will cause cmd's seq to appear in the next message.accept we send.
+ accepted.add(id);
+
+ if (!result.empty())
+ getProxy().getExecution().result(id, result);
+
+ // Are there any outstanding Execution.Sync commands pending the
+ // completion of this cmd? If so, complete them.
+ while (!pendingExecutionSyncs.empty() &&
+ (receiverGetIncomplete().empty() ||
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()))
+ {
+ const SequenceNumber syncId = pendingExecutionSyncs.front();
+ pendingExecutionSyncs.pop();
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed.");
+ if (receiverGetIncomplete().contains(syncId))
+ receiverCompleted(syncId);
+ callSendCompletion = true; // likely peer is pending for this completion.
+ }
+
+ // if the sender has requested immediate notification of the completion...
+ if (requiresSync || callSendCompletion) {
+ sendAcceptAndCompletion();
+ }
+}
+
+void SessionState::handleIn(AMQFrame& frame) {
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync());
+
+ if (m == 0 || m->isContentBearing()) {
+ handleContent(frame);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod());
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
+ }
+}
+
+void SessionState::handleOut(AMQFrame& frame) {
+ assert(handler);
+ handler->out(frame);
+}
+
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl,
+ qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync)
+{
+ uint32_t maxFrameSize = getConnection().getFrameMax();
+ assert(senderGetCommandPoint().offset == 0);
+ SequenceNumber commandId = senderGetCommandPoint().command;
+
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+ method.setEof(false);
+ getProxy().getHandler().handle(method);
+ message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations);
+ message.sendContent(getProxy().getHandler(), maxFrameSize);
+
+ assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ if (sync) {
+ AMQP_ClientProxy::Execution& p(getProxy().getExecution());
+ Proxy::ScopedSync s(p);
+ p.sync();
+ }
+ return commandId;
+}
+
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
+}
+
+void SessionState::senderCompleted(const SequenceSet& commands) {
+ qpid::SessionState::senderCompleted(commands);
+ semanticState.completed(commands);
+}
+
+void SessionState::readyToSend() {
+ QPID_LOG(debug, getId() << ": ready to send, activating output.");
+ assert(handler);
+ semanticState.attached();
+}
+
+Broker& SessionState::getBroker() { return broker; }
+
+// Session resume is not fully implemented so it is useless to set a
+// non-0 timeout.
+void SessionState::setTimeout(uint32_t) { }
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+bool SessionState::addPendingExecutionSync() {
+ SequenceNumber id = currentCommand.getId();
+ if (addPendingExecutionSync(id)) {
+ currentCommand.setCompleteSync(false);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id);
+ return true;
+ }
+ return false;
+}
+
+bool SessionState::addPendingExecutionSync(SequenceNumber id)
+{
+ if (receiverGetIncomplete().front() < id) {
+ pendingExecutionSyncs.push(id);
+ asyncCommandCompleter->flushPendingMessages();
+ return true;
+ }
+ return false;
+}
+
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
+ */
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
+{
+ // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
+ // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
+ if (requiresSync)
+ msg->flush();
+ else {
+ // otherwise, we need to track this message in order to flush it if an execution.sync arrives
+ // before it has been completed (see flushPendingMessages())
+ pending = true;
+ completerContext->addPendingMessage(msg);
+ }
+
+ return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
+}
+
+
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion. May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
+ */
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
+{
+ if (pending) completerContext->deletePendingMessage(id);
+ if (!sync) {
+ /** note well: this path may execute in any thread. It is safe to access
+ * the scheduledCompleterContext, since *this has a shared pointer to it.
+ * but not session!
+ */
+ session = 0;
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+ completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync);
+ } else {
+ // this path runs directly from the ac->end() call in handleContent() above,
+ // so *session is definately valid.
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ session->completeCommand(id, requiresAccept, requiresSync);
+ }
+ }
+ completerContext = 0;
+}
+
+
+/** Track an ingress message that is pending completion */
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
+ bool unique = pendingMsgs.insert(item).second;
+ if (!unique) {
+ assert(false);
+ }
+}
+
+
+/** pending message has completed */
+void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.erase(id);
+}
+
+
+/** done when an execution.sync arrives */
+void SessionState::AsyncCommandCompleter::flushPendingMessages()
+{
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
+ }
+ // drop lock, so it is safe to call "flush()"
+ for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
+ i != copy.end(); ++i) {
+ i->second->flush();
+ }
+}
+
+
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
+ */
+void SessionState::AsyncCommandCompleter::scheduleCommandCompletion(
+ SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+ if (session && isAttached) {
+ CommandInfo info(cmd, requiresAccept, requiresSync);
+ completedCmds.push_back(info);
+ if (completedCmds.size() == 1) {
+ session->getConnection().requestIOProcessing(
+ boost::bind(&AsyncCommandCompleter::completeCommands,
+ session->asyncCommandCompleter));
+ }
+ }
+}
+
+void SessionState::AsyncCommandCompleter::schedule(boost::function<void()> f) {
+ if (session && isAttached) session->getConnection().requestIOProcessing(f);
+}
+
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+ // when session is destroyed, it clears the session pointer via cancel().
+ if (session && session->isAttached()) {
+ for (std::vector<CommandInfo>::iterator cmd = completedCmds.begin();
+ cmd != completedCmds.end(); ++cmd) {
+ session->completeCommand(
+ cmd->cmd, cmd->requiresAccept, cmd->requiresSync);
+ }
+ }
+ completedCmds.clear();
+}
+
+
+/** cancel any pending calls to scheduleComplete */
+void SessionState::AsyncCommandCompleter::cancel()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ session = 0;
+}
+
+
+/** inform the completer that the session has attached,
+ * allows command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::attached()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ isAttached = true;
+}
+
+
+/** inform the completer that the session has detached,
+ * disables command completion scheduling from any thread */
+void SessionState::AsyncCommandCompleter::detached()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ isAttached = false;
+}
+
+}} // namespace qpid::broker