summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/SessionImpl.h
blob: cd7b2c123d6ea80feada7af6b0c706924cf9ee4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#ifndef _SessionImpl_
#define _SessionImpl_

#include "qpid/client/Demux.h"
#include "qpid/client/Execution.h"
#include "qpid/client/Results.h"
#include "qpid/client/ClientImportExport.h"

#include "qpid/SessionId.h"
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/sys/Semaphore.h"
#include "qpid/sys/StateMonitor.h"
#include "qpid/sys/ExceptionHolder.h"

#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/optional.hpp>

namespace qpid {

namespace framing {

class FrameSet;
class MethodContent;
class SequenceSet;

}

namespace client {

class Future;
class ConnectionImpl;
class SessionHandler;

///@internal
class SessionImpl : public framing::FrameHandler::InOutHandler,
                    public Execution,
                    private framing::AMQP_ClientOperations::SessionHandler,
                    private framing::AMQP_ClientOperations::ExecutionHandler,
                    private framing::AMQP_ClientOperations::MessageHandler
{
public:
    SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl>);
    ~SessionImpl();


    //NOTE: Public functions called in user thread.
    framing::FrameSet::shared_ptr get();

    const SessionId getId() const;

    uint16_t getChannel() const;
    void setChannel(uint16_t channel);

    void open(uint32_t detachedLifetime);
    void close();
    void resume(boost::shared_ptr<ConnectionImpl>);
    void suspend();

    QPID_CLIENT_EXTERN void assertOpen() const;
    QPID_CLIENT_EXTERN bool hasError() const;

    Future send(const framing::AMQBody& command);
    Future send(const framing::AMQBody& command, const framing::MethodContent& content);
    /**
     * This method takes the content as a FrameSet; if reframe=false,
     * the caller is resposnible for ensuring that the header and
     * content frames in that set are correct for this connection
     * (right flags, right fragmentation etc). If reframe=true, then
     * the header and content from the frameset will be copied and
     * reframed correctly for the connection.
     */
    QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false);
    void sendRawFrame(framing::AMQFrame& frame);

    Demux& getDemux();
    void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
    void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
    bool isComplete(const framing::SequenceNumber& id);
    bool isCompleteUpTo(const framing::SequenceNumber& id);
    framing::SequenceNumber getCompleteUpTo();
    void waitForCompletion(const framing::SequenceNumber& id);
    void sendCompletion();
    void sendFlush();

    void setException(const sys::ExceptionHolder&);
    
    //NOTE: these are called by the network thread when the connection is closed or dies
    void connectionClosed(uint16_t code, const std::string& text);
    void connectionBroke(const std::string& text);

    /** Set timeout in seconds, returns actual timeout allowed by broker */ 
    uint32_t setTimeout(uint32_t requestedSeconds);

    /** Get timeout in seconds. */
    uint32_t getTimeout() const;

    /** 
     * get the Connection associated with this connection
     */
    boost::shared_ptr<ConnectionImpl> getConnection();

    void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }

    /** Suppress sending detach in destructor. Used by cluster to build session state */
    void disableAutoDetach();

private:
    enum State {
        INACTIVE,
        ATTACHING,
        ATTACHED,
        DETACHING,
        DETACHED
    };
    typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
    typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
    typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
    typedef sys::StateMonitor<State, DETACHED> StateMonitor;
    typedef StateMonitor::Set States;

    inline void setState(State s);
    inline void waitFor(State);

    void setExceptionLH(const sys::ExceptionHolder&);      // LH = lock held when called.
    void detach();
    
    void check() const;
    void checkOpen() const;
    void handleClosed();

    void handleIn(framing::AMQFrame& frame);
    void handleOut(framing::AMQFrame& frame);
    /**
     * Sends session controls. This case is treated slightly
     * differently than command frames sent by the application via
     * handleOut(); session controlsare not subject to bounds checking
     * on the outgoing frame queue.
     */
    void proxyOut(framing::AMQFrame& frame);
    void sendFrame(framing::AMQFrame& frame, bool canBlock);
    void deliver(framing::AMQFrame& frame);

    Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
    void sendContent(const framing::MethodContent&);
    void waitForCompletionImpl(const framing::SequenceNumber& id);
    
    void sendCompletionImpl();

    // Note: Following methods are called by network thread in
    // response to session controls from the broker
    void attach(const std::string& name, bool force);    
    void attached(const std::string& name);    
    void detach(const std::string& name);    
    void detached(const std::string& name, uint8_t detachCode);
    void requestTimeout(uint32_t timeout);    
    void timeout(uint32_t timeout);    
    void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);    
    void expected(const framing::SequenceSet& commands, const framing::Array& fragments);    
    void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);    
    void completed(const framing::SequenceSet& commands, bool timelyReply);    
    void knownCompleted(const framing::SequenceSet& commands);    
    void flush(bool expected, bool confirmed, bool completed);    
    void gap(const framing::SequenceSet& commands);

    // Note: Following methods are called by network thread in
    // response to execution commands from the broker
    void sync();    
    void result(const framing::SequenceNumber& commandId, const std::string& value);    
    void exception(uint16_t errorCode,
                   const framing::SequenceNumber& commandId,
                   uint8_t classCode,
                   uint8_t commandCode,
                   uint8_t fieldIndex,
                   const std::string& description,
                   const framing::FieldTable& errorInfo);
                   
    // Note: Following methods are called by network thread in
    // response to message commands from the broker
    // EXCEPT Message.Transfer
    void accept(const qpid::framing::SequenceSet&);
    void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
    void release(const qpid::framing::SequenceSet&, bool);
    qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
    void setFlowMode(const std::string&, uint8_t);
    void flow(const std::string&, uint8_t, uint32_t);
    void stop(const std::string&);


    sys::ExceptionHolder exceptionHolder;
    mutable StateMonitor state;
    mutable sys::Semaphore sendLock;
    uint32_t detachedLifetime;
    const uint64_t maxFrameSize;
    const SessionId id;

    boost::shared_ptr<ConnectionImpl> connection;

    framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
    framing::ChannelHandler channel;
    framing::AMQP_ServerProxy::Session proxy;

    Results results;
    Demux demux;
    framing::FrameSet::shared_ptr arriving;

    framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
    framing::SequenceSet completedIn;//incoming commands that are have completed
    framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
    framing::SequenceSet completedOut;//outgoing commands that we know to be completed
    framing::SequenceNumber nextIn;
    framing::SequenceNumber nextOut;

    SessionState sessionState;

    // Only keep track of message credit 
    sys::Semaphore* sendMsgCredit;

    bool doClearDeliveryPropertiesExchange;

    bool autoDetach;
    
  friend class client::SessionHandler;
};

}} // namespace qpid::client

#endif