diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.h')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 71 |
1 files changed, 52 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 96f2e8f512..eade93ddaa 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -25,16 +25,15 @@ #include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" -#include "qpid/management/Session.h" -#include "SessionAdapter.h" -#include "DeliveryAdapter.h" -#include "IncompleteMessageList.h" -#include "MessageBuilder.h" -#include "SessionContext.h" -#include "SemanticState.h" +#include "qmf/org/apache/qpid/broker/Session.h" +#include "qpid/broker/SessionAdapter.h" +#include "qpid/broker/DeliveryAdapter.h" +#include "qpid/broker/IncompleteMessageList.h" +#include "qpid/broker/MessageBuilder.h" +#include "qpid/broker/SessionContext.h" +#include "qpid/broker/SemanticState.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -49,6 +48,10 @@ namespace framing { class AMQP_ClientProxy; } +namespace sys { +class TimerTask; +} + namespace broker { class Broker; @@ -56,12 +59,13 @@ class ConnectionState; class Message; class SessionHandler; class SessionManager; +class RateFlowcontrol; /** * Broker-side session state includes session's handler chains, which * may themselves have state. */ -class SessionState : public qpid::SessionState, +class SessionState : public qpid::SessionState, public SessionContext, public DeliveryAdapter, public management::Manageable, @@ -74,10 +78,14 @@ class SessionState : public qpid::SessionState, void detach(); void attach(SessionHandler& handler); + void disableOutput(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); - + + /** @pre isAttached() */ + uint16_t getChannel() const; + /** @pre isAttached() */ ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; @@ -85,22 +93,33 @@ class SessionState : public qpid::SessionState, Broker& getBroker(); /** OutputControl **/ + void abort(); void activateOutput(); + void giveReadCredit(int32_t); void senderCompleted(const framing::SequenceSet& ranges); - + void sendCompletion(); //delivery adapter methods: - DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + void deliver(DeliveryRecord&, bool sync); // Manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); + ManagementMethod (uint32_t methodId, management::Args& args, std::string&); void readyToSend(); + // Used by cluster to create replica sessions. + SemanticState& getSemanticState() { return semanticState; } + boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); } + SessionAdapter& getSessionAdapter() { return adapter; } + + bool processSendCredit(uint32_t msgs); + + const SessionId& getSessionId() const { return getId(); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); @@ -114,20 +133,34 @@ class SessionState : public qpid::SessionState, void handleInLast(framing::AMQFrame& frame); void handleOutLast(framing::AMQFrame& frame); + void sendAcceptAndCompletion(); + + /** + * If commands are sent based on the local time (e.g. in timers), they don't have + * a well-defined ordering across cluster nodes. + * This proxy is for sending such commands. In a clustered broker it will take steps + * to synchronize command order across the cluster. In a stand-alone broker + * it is just a synonym for getProxy() + */ + framing::AMQP_ClientProxy& getClusterOrderProxy(); + Broker& broker; - SessionHandler* handler; + SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. - sys::Mutex lock; - bool ignoring; - std::string name; SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; IncompleteMessageList incomplete; IncompleteMessageList::CompletionListener enqueuedOp; - management::Session* mgmtObject; + qmf::org::apache::qpid::broker::Session* mgmtObject; + qpid::framing::SequenceSet accepted; + + // State used for producer flow control (rate limited) + qpid::sys::Mutex rateLock; + boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; + boost::intrusive_ptr<sys::TimerTask> flowControlTimer; - friend class SessionManager; + friend class SessionManager; }; |