1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef _SessionImpl_
#define _SessionImpl_
#include "Demux.h"
#include "Execution.h"
#include "Results.h"
#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SessionState.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/sys/StateMonitor.h"
#include <boost/optional.hpp>
namespace qpid {
namespace framing {
class FrameSet;
class MethodContent;
class SequenceSet;
}
namespace client {
class Future;
class ConnectionImpl;
class SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
private framing::AMQP_ClientOperations::Session010Handler,
private framing::AMQP_ClientOperations::Execution010Handler
{
public:
SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionImpl();
//NOTE: Public functions called in user thread.
framing::FrameSet::shared_ptr get();
const framing::Uuid getId() const;
uint16_t getChannel() const;
void setChannel(uint16_t channel);
void open(uint32_t detachedLifetime);
void close();
void resume(shared_ptr<ConnectionImpl>);
void suspend();
void setSync(bool s);
bool isSync();
void assertOpen() const;
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
void connectionBroke(uint16_t code, const std::string& text);
private:
enum State {
INACTIVE,
ATTACHING,
ATTACHED,
DETACHING,
DETACHED
};
typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler;
typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
inline void setState(State s);
inline void waitFor(State);
void detach();
void check() const;
void checkOpen() const;
void handleClosed();
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
void sendCompletion();
// Note: Following methods are called by network thread in
// response to session controls from the broker
void attach(const std::string& name, bool force);
void attached(const std::string& name);
void detach(const std::string& name);
void detached(const std::string& name, uint8_t detachCode);
void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
void completed(const framing::SequenceSet& commands, bool timelyReply);
void knownCompleted(const framing::SequenceSet& commands);
void flush(bool expected, bool confirmed, bool completed);
void gap(const framing::SequenceSet& commands);
// Note: Following methods are called by network thread in
// response to execution commands from the broker
void sync();
void result(uint32_t commandId, const std::string& value);
void exception(uint16_t errorCode,
uint32_t commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
//hack for old generator:
void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
int code; // Error code
std::string text; // Error text
mutable StateMonitor state;
volatile bool syncMode;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
const framing::Uuid id;
const std::string name;
shared_ptr<ConnectionImpl> connection;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session010 proxy;
Results results;
Demux demux;
framing::FrameSet::shared_ptr arriving;
framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
framing::SequenceSet completedIn;//incoming commands that are have completed
framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
framing::SequenceSet completedOut;//outgoing commands that we know to be completed
framing::SequenceNumber nextIn;
framing::SequenceNumber nextOut;
};
}} // namespace qpid::client
#endif
|