diff options
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.h')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 55031a94ae..2f35032c4e 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -22,12 +22,13 @@ #ifndef _SessionImpl_ #define _SessionImpl_ -#include "Demux.h" -#include "Execution.h" -#include "Results.h" +#include "qpid/client/Demux.h" +#include "qpid/client/Execution.h" +#include "qpid/client/Results.h" +#include "qpid/client/ClientImportExport.h" #include "qpid/SessionId.h" -#include "qpid/shared_ptr.h" +#include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" @@ -35,7 +36,10 @@ #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/sys/Semaphore.h" #include "qpid/sys/StateMonitor.h" +#include "qpid/sys/ExceptionHolder.h" +#include <boost/weak_ptr.hpp> +#include <boost/shared_ptr.hpp> #include <boost/optional.hpp> namespace qpid { @@ -52,15 +56,17 @@ namespace client { class Future; class ConnectionImpl; +class SessionHandler; ///@internal class SessionImpl : public framing::FrameHandler::InOutHandler, public Execution, private framing::AMQP_ClientOperations::SessionHandler, - private framing::AMQP_ClientOperations::ExecutionHandler + private framing::AMQP_ClientOperations::ExecutionHandler, + private framing::AMQP_ClientOperations::MessageHandler { public: - SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl>); ~SessionImpl(); @@ -74,33 +80,57 @@ public: void open(uint32_t detachedLifetime); void close(); - void resume(shared_ptr<ConnectionImpl>); + void resume(boost::shared_ptr<ConnectionImpl>); void suspend(); void assertOpen() const; Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); + /** + * This method takes the content as a FrameSet; if reframe=false, + * the caller is resposnible for ensuring that the header and + * content frames in that set are correct for this connection + * (right flags, right fragmentation etc). If reframe=true, then + * the header and content from the frameset will be copied and + * reframed correctly for the connection. + */ + QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false); + void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); + framing::SequenceNumber getCompleteUpTo(); void waitForCompletion(const framing::SequenceNumber& id); void sendCompletion(); void sendFlush(); + void setException(const sys::ExceptionHolder&); + //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); - void connectionBroke(uint16_t code, const std::string& text); + void connectionBroke(const std::string& text); + + /** Set timeout in seconds, returns actual timeout allowed by broker */ + uint32_t setTimeout(uint32_t requestedSeconds); + + /** Get timeout in seconds. */ + uint32_t getTimeout() const; + + /** + * get the Connection associated with this connection + */ + boost::shared_ptr<ConnectionImpl> getConnection(); + + void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + + /** Suppress sending detach in destructor. Used by cluster to build session state */ + void disableAutoDetach(); private: - enum ErrorType { - OK, - CONNECTION_CLOSE, - SESSION_DETACH, - EXCEPTION - }; enum State { INACTIVE, ATTACHING, @@ -110,12 +140,14 @@ private: }; typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler; + typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler; typedef sys::StateMonitor<State, DETACHED> StateMonitor; typedef StateMonitor::Set States; inline void setState(State s); inline void waitFor(State); + void setExceptionLH(const sys::ExceptionHolder&); // LH = lock held when called. void detach(); void check() const; @@ -124,13 +156,19 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + /** + * Sends session controls. This case is treated slightly + * differently than command frames sent by the application via + * handleOut(); session controlsare not subject to bounds checking + * on the outgoing frame queue. + */ void proxyOut(framing::AMQFrame& frame); + void sendFrame(framing::AMQFrame& frame, bool canBlock); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - void requestTimeout(uint32_t timeout); void sendCompletionImpl(); @@ -139,7 +177,8 @@ private: void attach(const std::string& name, bool force); void attached(const std::string& name); void detach(const std::string& name); - void detached(const std::string& name, uint8_t detachCode); + void detached(const std::string& name, uint8_t detachCode); + void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); @@ -160,17 +199,28 @@ private: uint8_t fieldIndex, const std::string& description, const framing::FieldTable& errorInfo); - - ErrorType error; - int code; // Error code - std::string text; // Error text + + // Note: Following methods are called by network thread in + // response to message commands from the broker + // EXCEPT Message.Transfer + void accept(const qpid::framing::SequenceSet&); + void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&); + void release(const qpid::framing::SequenceSet&, bool); + qpid::framing::MessageResumeResult resume(const std::string&, const std::string&); + void setFlowMode(const std::string&, uint8_t); + void flow(const std::string&, uint8_t, uint32_t); + void stop(const std::string&); + + + sys::ExceptionHolder exceptionHolder; mutable StateMonitor state; mutable sys::Semaphore sendLock; uint32_t detachedLifetime; const uint64_t maxFrameSize; const SessionId id; - shared_ptr<ConnectionImpl> connection; + boost::shared_ptr<ConnectionImpl> connection; + framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; @@ -186,6 +236,16 @@ private: framing::SequenceNumber nextIn; framing::SequenceNumber nextOut; + SessionState sessionState; + + // Only keep track of message credit + sys::Semaphore* sendMsgCredit; + + bool doClearDeliveryPropertiesExchange; + + bool autoDetach; + + friend class client::SessionHandler; }; }} // namespace qpid::client |