summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/SessionImpl.h
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
committerRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
commit32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch)
tree2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/qpid/client/SessionImpl.h
parent116d91ad7825a98af36a869fc751206fbce0c59f (diff)
parentf7e896076143de4572b4f1f67ef0765125f2498d (diff)
downloadqpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/client/SessionImpl.h')
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h220
1 files changed, 220 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
new file mode 100644
index 0000000000..752d587a39
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _SessionImpl_
+#define _SessionImpl_
+
+#include "qpid/client/Demux.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/Results.h"
+#include "qpid/client/ClientImportExport.h"
+
+#include "qpid/SessionId.h"
+#include "qpid/SessionState.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/Semaphore.h"
+#include "qpid/sys/StateMonitor.h"
+#include "qpid/sys/ExceptionHolder.h"
+
+#include <boost/weak_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/optional.hpp>
+
+namespace qpid {
+
+namespace framing {
+
+class FrameSet;
+class MethodContent;
+class SequenceSet;
+
+}
+
+namespace client {
+
+class Future;
+class ConnectionImpl;
+class SessionHandler;
+
+///@internal
+class SessionImpl : public framing::FrameHandler::InOutHandler,
+ public Execution,
+ private framing::AMQP_ClientOperations::SessionHandler,
+ private framing::AMQP_ClientOperations::ExecutionHandler
+{
+public:
+ SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl>);
+ ~SessionImpl();
+
+
+ //NOTE: Public functions called in user thread.
+ framing::FrameSet::shared_ptr get();
+
+ const SessionId getId() const;
+
+ uint16_t getChannel() const;
+ void setChannel(uint16_t channel);
+
+ void open(uint32_t detachedLifetime);
+ void close();
+ void resume(boost::shared_ptr<ConnectionImpl>);
+ void suspend();
+
+ QPID_CLIENT_EXTERN void assertOpen() const;
+ QPID_CLIENT_EXTERN bool hasError() const;
+
+ Future send(const framing::AMQBody& command);
+ Future send(const framing::AMQBody& command, const framing::MethodContent& content);
+ QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content);
+ void sendRawFrame(framing::AMQFrame& frame);
+
+ Demux& getDemux();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
+ bool isComplete(const framing::SequenceNumber& id);
+ bool isCompleteUpTo(const framing::SequenceNumber& id);
+ framing::SequenceNumber getCompleteUpTo();
+ void waitForCompletion(const framing::SequenceNumber& id);
+ void sendCompletion();
+ void sendFlush();
+
+ QPID_CLIENT_EXTERN void setException(const sys::ExceptionHolder&);
+
+ //NOTE: these are called by the network thread when the connection is closed or dies
+ void connectionClosed(uint16_t code, const std::string& text);
+ void connectionBroke(const std::string& text);
+
+ /** Set timeout in seconds, returns actual timeout allowed by broker */
+ uint32_t setTimeout(uint32_t requestedSeconds);
+
+ /** Get timeout in seconds. */
+ uint32_t getTimeout() const;
+
+ /**
+ * get the Connection associated with this connection
+ */
+ boost::shared_ptr<ConnectionImpl> getConnection();
+
+private:
+ enum State {
+ INACTIVE,
+ ATTACHING,
+ ATTACHED,
+ DETACHING,
+ DETACHED
+ };
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
+ typedef sys::StateMonitor<State, DETACHED> StateMonitor;
+ typedef StateMonitor::Set States;
+
+ inline void setState(State s);
+ inline void waitFor(State);
+
+ void setExceptionLH(const sys::ExceptionHolder&); // LH = lock held when called.
+ void detach();
+
+ void check() const;
+ void checkOpen() const;
+ void handleClosed();
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+ /**
+ * Sends session controls. This case is treated slightly
+ * differently than command frames sent by the application via
+ * handleOut(); session controlsare not subject to bounds checking
+ * on the outgoing frame queue.
+ */
+ void proxyOut(framing::AMQFrame& frame);
+ void sendFrame(framing::AMQFrame& frame, bool canBlock);
+ void deliver(framing::AMQFrame& frame);
+
+ Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
+ void sendContent(const framing::MethodContent&);
+ void waitForCompletionImpl(const framing::SequenceNumber& id);
+
+ void sendCompletionImpl();
+
+ // Note: Following methods are called by network thread in
+ // response to session controls from the broker
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t detachCode);
+ void requestTimeout(uint32_t timeout);
+ void timeout(uint32_t timeout);
+ void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ // Note: Following methods are called by network thread in
+ // response to execution commands from the broker
+ void sync();
+ void result(const framing::SequenceNumber& commandId, const std::string& value);
+ void exception(uint16_t errorCode,
+ const framing::SequenceNumber& commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const std::string& description,
+ const framing::FieldTable& errorInfo);
+
+ sys::ExceptionHolder exceptionHolder;
+ mutable StateMonitor state;
+ mutable sys::Semaphore sendLock;
+ uint32_t detachedLifetime;
+ const uint64_t maxFrameSize;
+ const SessionId id;
+
+ boost::shared_ptr<ConnectionImpl> connection;
+
+ framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
+ framing::ChannelHandler channel;
+ framing::AMQP_ServerProxy::Session proxy;
+
+ Results results;
+ Demux demux;
+ framing::FrameSet::shared_ptr arriving;
+
+ framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
+ framing::SequenceSet completedIn;//incoming commands that are have completed
+ framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
+ framing::SequenceSet completedOut;//outgoing commands that we know to be completed
+ framing::SequenceNumber nextIn;
+ framing::SequenceNumber nextOut;
+
+ SessionState sessionState;
+
+ friend class client::SessionHandler;
+};
+
+}} // namespace qpid::client
+
+#endif