summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp92
1 files changed, 62 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index ed092d6a05..9b065be8af 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+
namespace qpid {
namespace broker {
using namespace framing;
@@ -33,7 +35,9 @@ using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
- connection(c), channel(ch), proxy(out),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
ignoring(false) {}
SessionHandler::~SessionHandler() {}
@@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
try {
if (m && invoke(*this, *m))
return;
- else if (session.get())
- session->in(f);
+ else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ }
else if (!ignoring)
throw ChannelErrorException(
- QPID_MSG("Channel " << channel << " is not open"));
+ QPID_MSG("Channel " << channel.get() << " is not open"));
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session.reset();
- getProxy().getSession().closed(e.code, e.toString());
+ peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- f.setChannel(getChannel());
- out.next->handle(f);
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
}
-void SessionHandler::assertOpen(const char* method) {
- if (!session.get())
+void SessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
-void SessionHandler::assertClosed(const char* method) {
+void SessionHandler::assertClosed(const char* method) const {
if (session.get())
throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel
+ QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
@@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) {
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ peerSession.attached(session->getId(), session->getTimeout());
}
void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(*this, id);
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
}
void SessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flow");
}
void SessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flowOk");
}
void SessionHandler::close() {
+ assertAttached("close");
QPID_LOG(info, "Received session.close");
ignoring=false;
session.reset();
- getProxy().getSession().closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
@@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
session.reset();
}
+void SessionHandler::localSuspend() {
+ if (session.get() && session->getState() == SessionState::ATTACHED) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ }
+}
+
void SessionHandler::suspend() {
- assertOpen("suspend");
- connection.broker.getSessionManager().suspend(session);
- assert(!session.get());
- getProxy().getSession().detached();
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
-void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
- const SequenceNumberSet& /*seenFrameSet*/) {
- assert(0); throw NotImplementedException();
+void SessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&SessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
}
void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- assert(0); throw NotImplementedException();
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
}
void SessionHandler::solicitAck() {
- assert(0); throw NotImplementedException();
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
}} // namespace qpid::broker