summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.h')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h284
1 files changed, 284 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
new file mode 100644
index 0000000000..b43df0c0aa
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -0,0 +1,284 @@
+#ifndef QPID_BROKER_SESSION_H
+#define QPID_BROKER_SESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/SessionState.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Session.h"
+#include "qpid/broker/SessionAdapter.h"
+#include "qpid/broker/DeliveryAdapter.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/MessageBuilder.h"
+#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include <set>
+#include <vector>
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace sys {
+class TimerTask;
+}
+
+namespace broker {
+
+class Broker;
+class ConnectionState;
+class Message;
+class SessionHandler;
+class SessionManager;
+class RateFlowcontrol;
+
+/**
+ * Broker-side session state includes session's handler chains, which
+ * may themselves have state.
+ */
+class SessionState : public qpid::SessionState,
+ public SessionContext,
+ public DeliveryAdapter,
+ public management::Manageable,
+ public framing::FrameHandler::InOutHandler
+{
+ public:
+ SessionState(Broker&, SessionHandler&, const SessionId&,
+ const SessionState::Configuration&, bool delayManagement=false);
+ ~SessionState();
+ bool isAttached() const { return handler; }
+
+ void detach();
+ void attach(SessionHandler& handler);
+ void disableOutput();
+
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ uint16_t getChannel() const;
+
+ /** @pre isAttached() */
+ ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
+
+ Broker& getBroker();
+
+ void setTimeout(uint32_t seconds);
+
+ /** OutputControl **/
+ void abort();
+ void activateOutput();
+ void giveReadCredit(int32_t);
+
+ void senderCompleted(const framing::SequenceSet& ranges);
+
+ void sendCompletion();
+
+ //delivery adapter methods:
+ void deliver(DeliveryRecord&, bool sync);
+
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
+
+ void readyToSend();
+
+ // Used by cluster to create replica sessions.
+ SemanticState& getSemanticState() { return semanticState; }
+ boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ SessionAdapter& getSessionAdapter() { return adapter; }
+
+ bool processSendCredit(uint32_t msgs);
+
+ const SessionId& getSessionId() const { return getId(); }
+
+ // Used by ExecutionHandler sync command processing. Notifies
+ // the SessionState of a received Execution.Sync command.
+ void addPendingExecutionSync();
+
+ // Used to delay creation of management object for sessions
+ // belonging to inter-broker bridges
+ void addManagementObject();
+
+ private:
+ void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
+ void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
+
+ // indicate that the given ingress msg has been completely received by the
+ // broker, and the msg's message.transfer command can be considered completed.
+ void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ // End of the input & output chains.
+ void handleInLast(framing::AMQFrame& frame);
+ void handleOutLast(framing::AMQFrame& frame);
+
+ void sendAcceptAndCompletion();
+
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
+ Broker& broker;
+ SessionHandler* handler;
+ sys::AbsTime expiry; // Used by SessionManager.
+ SemanticState semanticState;
+ SessionAdapter adapter;
+ MessageBuilder msgBuilder;
+ qmf::org::apache::qpid::broker::Session* mgmtObject;
+ qpid::framing::SequenceSet accepted;
+
+ // State used for producer flow control (rate limited)
+ qpid::sys::Mutex rateLock;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
+ boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
+
+ // sequence numbers for pending received Execution.Sync commands
+ std::queue<SequenceNumber> pendingExecutionSyncs;
+ bool currentCommandComplete;
+
+ /** This class provides a context for completing asynchronous commands in a thread
+ * safe manner. Asynchronous commands save their completion state in this class.
+ * This class then schedules the completeCommands() method in the IO thread.
+ * While running in the IO thread, completeCommands() may safely complete all
+ * saved commands without the risk of colliding with other operations on this
+ * SessionState.
+ */
+ class AsyncCommandCompleter : public RefCounted {
+ private:
+ SessionState *session;
+ bool isAttached;
+ qpid::sys::Mutex completerLock;
+
+ // special-case message.transfer commands for optimization
+ struct MessageInfo {
+ SequenceNumber cmd; // message.transfer command id
+ bool requiresAccept;
+ bool requiresSync;
+ MessageInfo(SequenceNumber c, bool a, bool s)
+ : cmd(c), requiresAccept(a), requiresSync(s) {}
+ };
+ std::vector<MessageInfo> completedMsgs;
+ // If an ingress message does not require a Sync, we need to
+ // hold a reference to it in case an Execution.Sync command is received and we
+ // have to manually flush the message.
+ std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+
+ /** complete all pending commands, runs in IO thread */
+ void completeCommands();
+
+ /** for scheduling a run of "completeCommands()" on the IO thread */
+ static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
+
+ public:
+ AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
+ ~AsyncCommandCompleter() {};
+
+ /** track a message pending ingress completion */
+ void addPendingMessage(boost::intrusive_ptr<Message> m);
+ void deletePendingMessage(SequenceNumber id);
+ void flushPendingMessages();
+ /** schedule the processing of a completed ingress message.transfer command */
+ void scheduleMsgCompletion(SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync);
+ void cancel(); // called by SessionState destructor.
+ void attached(); // called by SessionState on attach()
+ void detached(); // called by SessionState on detach()
+ };
+ boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+ /** Abstract class that represents a single asynchronous command that is
+ * pending completion.
+ */
+ class AsyncCommandContext : public AsyncCompletion::Callback
+ {
+ public:
+ AsyncCommandContext( SessionState *ss, SequenceNumber _id )
+ : id(_id), completerContext(ss->asyncCommandCompleter) {}
+ virtual ~AsyncCommandContext() {}
+
+ protected:
+ SequenceNumber id;
+ boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
+ };
+
+ /** incomplete Message.transfer commands - inbound to broker from client
+ */
+ class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
+ {
+ public:
+ IncompleteIngressMsgXfer( SessionState *ss,
+ boost::intrusive_ptr<Message> m )
+ : AsyncCommandContext(ss, m->getCommandId()),
+ session(ss),
+ msg(m),
+ requiresAccept(m->requiresAccept()),
+ requiresSync(m->getFrames().getMethod()->isSync()),
+ pending(false) {}
+ virtual ~IncompleteIngressMsgXfer() {};
+
+ virtual void completed(bool);
+ virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
+
+ private:
+ SessionState *session; // only valid if sync flag in callback is true
+ boost::intrusive_ptr<Message> msg;
+ bool requiresAccept;
+ bool requiresSync;
+ bool pending; // true if msg saved on pending list...
+ };
+
+ friend class SessionManager;
+};
+
+
+inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
+ return out << session.getId();
+}
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/