summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp63
1 files changed, 49 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 1cb10d0c19..0e3c9928d1 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -21,6 +21,7 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
@@ -36,7 +37,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
+ if (!ignoring) {
+ if (m &&
+ (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
+ invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
}
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
@@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) {
}
void SessionHandler::assertAttached(const char* method) const {
- if (!session.get())
+ if (!session.get()) {
+ std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
+ }
}
void SessionHandler::assertClosed(const char* method) const {
@@ -208,4 +215,32 @@ void SessionHandler::detached()
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
+void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ assertAttached("complete");
+ session->complete(cumulative, range);
+}
+
+void SessionHandler::flush()
+{
+ assertAttached("flush");
+ session->flush();
+}
+void SessionHandler::sync()
+{
+ assertAttached("sync");
+ session->sync();
+}
+
+void SessionHandler::noop()
+{
+ assertAttached("noop");
+ session->noop();
+}
+
+void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
+
}} // namespace qpid::broker