/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #ifndef _SessionImpl_ #define _SessionImpl_ #include "qpid/client/Demux.h" #include "qpid/client/Execution.h" #include "qpid/client/Results.h" #include "qpid/client/ClientImportExport.h" #include "qpid/SessionId.h" #include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/sys/Semaphore.h" #include "qpid/sys/StateMonitor.h" #include "qpid/sys/ExceptionHolder.h" #include #include #include namespace qpid { namespace framing { class FrameSet; class MethodContent; class SequenceSet; } namespace client { class Future; class ConnectionImpl; class SessionHandler; ///@internal class SessionImpl : public framing::FrameHandler::InOutHandler, public Execution, private framing::AMQP_ClientOperations::SessionHandler, private framing::AMQP_ClientOperations::ExecutionHandler, private framing::AMQP_ClientOperations::MessageHandler { public: SessionImpl(const std::string& name, boost::shared_ptr); ~SessionImpl(); //NOTE: Public functions called in user thread. framing::FrameSet::shared_ptr get(); const SessionId getId() const; uint16_t getChannel() const; void setChannel(uint16_t channel); void open(uint32_t detachedLifetime); void close(); void resume(boost::shared_ptr); void suspend(); QPID_CLIENT_EXTERN void assertOpen() const; QPID_CLIENT_EXTERN bool hasError() const; Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); /** * This method takes the content as a FrameSet; if reframe=false, * the caller is resposnible for ensuring that the header and * content frames in that set are correct for this connection * (right flags, right fragmentation etc). If reframe=true, then * the header and content from the frameset will be copied and * reframed correctly for the connection. */ QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false); void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); framing::SequenceNumber getCompleteUpTo(); void waitForCompletion(const framing::SequenceNumber& id); void sendCompletion(); void sendFlush(); void setException(const sys::ExceptionHolder&); //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); void connectionBroke(const std::string& text); /** Set timeout in seconds, returns actual timeout allowed by broker */ uint32_t setTimeout(uint32_t requestedSeconds); /** Get timeout in seconds. */ uint32_t getTimeout() const; /** * get the Connection associated with this connection */ boost::shared_ptr getConnection(); void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } /** Suppress sending detach in destructor. Used by cluster to build session state */ void disableAutoDetach(); private: enum State { INACTIVE, ATTACHING, ATTACHED, DETACHING, DETACHED }; typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler; typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler; typedef sys::StateMonitor StateMonitor; typedef StateMonitor::Set States; inline void setState(State s); inline void waitFor(State); void setExceptionLH(const sys::ExceptionHolder&); // LH = lock held when called. void detach(); void check() const; void checkOpen() const; void handleClosed(); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); /** * Sends session controls. This case is treated slightly * differently than command frames sent by the application via * handleOut(); session controlsare not subject to bounds checking * on the outgoing frame queue. */ void proxyOut(framing::AMQFrame& frame); void sendFrame(framing::AMQFrame& frame, bool canBlock); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); void sendCompletionImpl(); // Note: Following methods are called by network thread in // response to session controls from the broker void attach(const std::string& name, bool force); void attached(const std::string& name); void detach(const std::string& name); void detached(const std::string& name, uint8_t detachCode); void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments); void completed(const framing::SequenceSet& commands, bool timelyReply); void knownCompleted(const framing::SequenceSet& commands); void flush(bool expected, bool confirmed, bool completed); void gap(const framing::SequenceSet& commands); // Note: Following methods are called by network thread in // response to execution commands from the broker void sync(); void result(const framing::SequenceNumber& commandId, const std::string& value); void exception(uint16_t errorCode, const framing::SequenceNumber& commandId, uint8_t classCode, uint8_t commandCode, uint8_t fieldIndex, const std::string& description, const framing::FieldTable& errorInfo); // Note: Following methods are called by network thread in // response to message commands from the broker // EXCEPT Message.Transfer void accept(const qpid::framing::SequenceSet&); void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&); void release(const qpid::framing::SequenceSet&, bool); qpid::framing::MessageResumeResult resume(const std::string&, const std::string&); void setFlowMode(const std::string&, uint8_t); void flow(const std::string&, uint8_t, uint32_t); void stop(const std::string&); sys::ExceptionHolder exceptionHolder; mutable StateMonitor state; mutable sys::Semaphore sendLock; uint32_t detachedLifetime; const uint64_t maxFrameSize; const SessionId id; boost::shared_ptr connection; framing::FrameHandler::MemFunRef ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; Results results; Demux demux; framing::FrameSet::shared_ptr arriving; framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete framing::SequenceSet completedIn;//incoming commands that are have completed framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete framing::SequenceSet completedOut;//outgoing commands that we know to be completed framing::SequenceNumber nextIn; framing::SequenceNumber nextOut; SessionState sessionState; // Only keep track of message credit sys::Semaphore* sendMsgCredit; bool doClearDeliveryPropertiesExchange; bool autoDetach; friend class client::SessionHandler; }; }} // namespace qpid::client #endif