summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.h')
-rw-r--r--cpp/src/qpid/broker/SessionState.h71
1 files changed, 52 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 96f2e8f512..eade93ddaa 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -25,16 +25,15 @@
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
-#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/Session.h"
-#include "SessionAdapter.h"
-#include "DeliveryAdapter.h"
-#include "IncompleteMessageList.h"
-#include "MessageBuilder.h"
-#include "SessionContext.h"
-#include "SemanticState.h"
+#include "qmf/org/apache/qpid/broker/Session.h"
+#include "qpid/broker/SessionAdapter.h"
+#include "qpid/broker/DeliveryAdapter.h"
+#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/MessageBuilder.h"
+#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/SemanticState.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -49,6 +48,10 @@ namespace framing {
class AMQP_ClientProxy;
}
+namespace sys {
+class TimerTask;
+}
+
namespace broker {
class Broker;
@@ -56,12 +59,13 @@ class ConnectionState;
class Message;
class SessionHandler;
class SessionManager;
+class RateFlowcontrol;
/**
* Broker-side session state includes session's handler chains, which
* may themselves have state.
*/
-class SessionState : public qpid::SessionState,
+class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
public management::Manageable,
@@ -74,10 +78,14 @@ class SessionState : public qpid::SessionState,
void detach();
void attach(SessionHandler& handler);
+ void disableOutput();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
-
+
+ /** @pre isAttached() */
+ uint16_t getChannel() const;
+
/** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
@@ -85,22 +93,33 @@ class SessionState : public qpid::SessionState,
Broker& getBroker();
/** OutputControl **/
+ void abort();
void activateOutput();
+ void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
-
+
void sendCompletion();
//delivery adapter methods:
- DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
+ void deliver(DeliveryRecord&, bool sync);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args);
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void readyToSend();
+ // Used by cluster to create replica sessions.
+ SemanticState& getSemanticState() { return semanticState; }
+ boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ SessionAdapter& getSessionAdapter() { return adapter; }
+
+ bool processSendCredit(uint32_t msgs);
+
+ const SessionId& getSessionId() const { return getId(); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
@@ -114,20 +133,34 @@ class SessionState : public qpid::SessionState,
void handleInLast(framing::AMQFrame& frame);
void handleOutLast(framing::AMQFrame& frame);
+ void sendAcceptAndCompletion();
+
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
- SessionHandler* handler;
+ SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
- sys::Mutex lock;
- bool ignoring;
- std::string name;
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
IncompleteMessageList incomplete;
IncompleteMessageList::CompletionListener enqueuedOp;
- management::Session* mgmtObject;
+ qmf::org::apache::qpid::broker::Session* mgmtObject;
+ qpid::framing::SequenceSet accepted;
+
+ // State used for producer flow control (rate limited)
+ qpid::sys::Mutex rateLock;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
+ boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
- friend class SessionManager;
+ friend class SessionManager;
};