summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp77
1 files changed, 70 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index d4f8c25892..e7ef6fdb87 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -28,11 +28,13 @@
namespace qpid {
namespace broker {
using namespace framing;
+using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
connection(c), channel(ch), proxy(out),
- ignoring(false), channelHandler(*this) {}
+ ignoring(false), channelHandler(*this),
+ useChannelClose(false) {}
SessionHandler::~SessionHandler() {}
@@ -50,18 +52,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m=f.getMethod();
try {
- if (m && m->invoke(&channelHandler))
+ if (m && (m->invoke(this) || m->invoke(&channelHandler)))
return;
else if (session)
session->in(f);
else if (!ignoring)
throw ChannelErrorException(
QPID_MSG("Channel " << channel << " is not open"));
- } catch(const ChannelException& e){
- getProxy().getChannel().close(
- e.code, e.toString(), classId(m), methodId(m));
- session.reset();
+ } catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
+ session.reset();
+ // FIXME aconway 2007-09-19: Dual-mode hack.
+ if (useChannelClose)
+ getProxy().getChannel().close(
+ e.code, e.toString(), classId(m), methodId(m));
+ else
+ getProxy().getSession().closed(e.code, e.toString());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -93,6 +99,7 @@ void SessionHandler::assertClosed(const char* method) {
}
void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+ parent.useChannelClose=true;
parent.assertClosed("open");
parent.session.reset(new Session(parent, 0));
parent.getProxy().getChannel().openOk();
@@ -112,7 +119,7 @@ void SessionHandler::ChannelMethods::close(uint16_t replyCode,
{
// FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
// to text names.
- QPID_LOG(warning, "Received session.close("<<replyCode<<","
+ QPID_LOG(warning, "Received channel.close("<<replyCode<<","
<<replyText << ","
<< "classid=" <<classId<< ","
<< "methodid=" <<methodId);
@@ -136,4 +143,60 @@ void SessionHandler::ChannelMethods::ok()
//sufficient
}
+void SessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ session.reset(new Session(*this, detachedLifetime));
+ getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
+void SessionHandler::flow(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::flowOk(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::close() {
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session.reset();
+ getProxy().getSession().closed(REPLY_SUCCESS, "ok");
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+ // to text names.
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session.reset();
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::resume(const Uuid& /*sessionId*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::suspend() {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
+ const SequenceNumberSet& /*seenFrameSet*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::solicitAck() {
+ assert(0); throw NotImplementedException();
+}
+
}} // namespace qpid::broker