summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionImpl.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.h')
-rw-r--r--cpp/src/qpid/client/SessionImpl.h102
1 files changed, 81 insertions, 21 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 55031a94ae..2f35032c4e 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -22,12 +22,13 @@
#ifndef _SessionImpl_
#define _SessionImpl_
-#include "Demux.h"
-#include "Execution.h"
-#include "Results.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/Results.h"
+#include "qpid/client/ClientImportExport.h"
#include "qpid/SessionId.h"
-#include "qpid/shared_ptr.h"
+#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
@@ -35,7 +36,10 @@
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/sys/Semaphore.h"
#include "qpid/sys/StateMonitor.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <boost/weak_ptr.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/optional.hpp>
namespace qpid {
@@ -52,15 +56,17 @@ namespace client {
class Future;
class ConnectionImpl;
+class SessionHandler;
///@internal
class SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
private framing::AMQP_ClientOperations::SessionHandler,
- private framing::AMQP_ClientOperations::ExecutionHandler
+ private framing::AMQP_ClientOperations::ExecutionHandler,
+ private framing::AMQP_ClientOperations::MessageHandler
{
public:
- SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+ SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl>);
~SessionImpl();
@@ -74,33 +80,57 @@ public:
void open(uint32_t detachedLifetime);
void close();
- void resume(shared_ptr<ConnectionImpl>);
+ void resume(boost::shared_ptr<ConnectionImpl>);
void suspend();
void assertOpen() const;
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
+ /**
+ * This method takes the content as a FrameSet; if reframe=false,
+ * the caller is resposnible for ensuring that the header and
+ * content frames in that set are correct for this connection
+ * (right flags, right fragmentation etc). If reframe=true, then
+ * the header and content from the frameset will be copied and
+ * reframed correctly for the connection.
+ */
+ QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false);
+ void sendRawFrame(framing::AMQFrame& frame);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
+ framing::SequenceNumber getCompleteUpTo();
void waitForCompletion(const framing::SequenceNumber& id);
void sendCompletion();
void sendFlush();
+ void setException(const sys::ExceptionHolder&);
+
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
- void connectionBroke(uint16_t code, const std::string& text);
+ void connectionBroke(const std::string& text);
+
+ /** Set timeout in seconds, returns actual timeout allowed by broker */
+ uint32_t setTimeout(uint32_t requestedSeconds);
+
+ /** Get timeout in seconds. */
+ uint32_t getTimeout() const;
+
+ /**
+ * get the Connection associated with this connection
+ */
+ boost::shared_ptr<ConnectionImpl> getConnection();
+
+ void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
+
+ /** Suppress sending detach in destructor. Used by cluster to build session state */
+ void disableAutoDetach();
private:
- enum ErrorType {
- OK,
- CONNECTION_CLOSE,
- SESSION_DETACH,
- EXCEPTION
- };
enum State {
INACTIVE,
ATTACHING,
@@ -110,12 +140,14 @@ private:
};
typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
+ typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
inline void setState(State s);
inline void waitFor(State);
+ void setExceptionLH(const sys::ExceptionHolder&); // LH = lock held when called.
void detach();
void check() const;
@@ -124,13 +156,19 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
+ /**
+ * Sends session controls. This case is treated slightly
+ * differently than command frames sent by the application via
+ * handleOut(); session controlsare not subject to bounds checking
+ * on the outgoing frame queue.
+ */
void proxyOut(framing::AMQFrame& frame);
+ void sendFrame(framing::AMQFrame& frame, bool canBlock);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void requestTimeout(uint32_t timeout);
void sendCompletionImpl();
@@ -139,7 +177,8 @@ private:
void attach(const std::string& name, bool force);
void attached(const std::string& name);
void detach(const std::string& name);
- void detached(const std::string& name, uint8_t detachCode);
+ void detached(const std::string& name, uint8_t detachCode);
+ void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
@@ -160,17 +199,28 @@ private:
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
-
- ErrorType error;
- int code; // Error code
- std::string text; // Error text
+
+ // Note: Following methods are called by network thread in
+ // response to message commands from the broker
+ // EXCEPT Message.Transfer
+ void accept(const qpid::framing::SequenceSet&);
+ void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
+ void release(const qpid::framing::SequenceSet&, bool);
+ qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
+ void setFlowMode(const std::string&, uint8_t);
+ void flow(const std::string&, uint8_t, uint32_t);
+ void stop(const std::string&);
+
+
+ sys::ExceptionHolder exceptionHolder;
mutable StateMonitor state;
mutable sys::Semaphore sendLock;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
const SessionId id;
- shared_ptr<ConnectionImpl> connection;
+ boost::shared_ptr<ConnectionImpl> connection;
+
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
@@ -186,6 +236,16 @@ private:
framing::SequenceNumber nextIn;
framing::SequenceNumber nextOut;
+ SessionState sessionState;
+
+ // Only keep track of message credit
+ sys::Semaphore* sendMsgCredit;
+
+ bool doClearDeliveryPropertiesExchange;
+
+ bool autoDetach;
+
+ friend class client::SessionHandler;
};
}} // namespace qpid::client