diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
commit | 0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e (patch) | |
tree | 8fe7333962fbea735455340424657a540c6ef9a9 /qpid/cpp/src/qpid/broker/SessionHandler.cpp | |
parent | c8ad468141a96e5fdf4534552fe72e84399d5d5d (diff) | |
download | qpid-python-0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e.tar.gz |
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 179 |
1 files changed, 80 insertions, 99 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 0e3c9928d1..de96ae3f12 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) { AMQMethodBody* m = f.getBody()->getMethod(); try { if (!ignoring) { - if (m && - (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || - invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { return; } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); session->handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; } else { throw ChannelErrorException( QPID_MSG("Channel " << channel.get() << " is not open")); @@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) { ignoring=true; // Ignore trailing frames sent by client. session->detach(); session.reset(); - peerSession.closed(e.code, e.what()); + //TODO: implement new exception handling mechanism + //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) { void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) - peerSession.solicitAck(); + peerSession.flush(false, false, true); } void SessionHandler::assertAttached(const char* method) const { @@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const { << " is already open.")); } -void SessionHandler::open(uint32_t detachedLifetime) { - assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); - peerSession.attached(session->getId(), session->getTimeout()); +void SessionHandler::localSuspend() { + if (session.get() && session->isAttached()) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + session.reset(); + } } -void SessionHandler::resume(const Uuid& id) { - assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); - session->attach(*this); - SequenceNumber seq = session->resuming(); - peerSession.attached(session->getId(), session->getTimeout()); - proxy.getSession().ack(seq, SequenceNumberSet()); -} -void SessionHandler::flow(bool /*active*/) { - assertAttached("flow"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flow"); +ConnectionState& SessionHandler::getConnection() { return connection; } +const ConnectionState& SessionHandler::getConnection() const { return connection; } + +//new methods: +void SessionHandler::attach(const std::string& name, bool /*force*/) +{ + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); + peerSession.attached(name); } -void SessionHandler::flowOk(bool /*active*/) { - assertAttached("flowOk"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flowOk"); +void SessionHandler::attached(const std::string& /*name*/) +{ + std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); } -void SessionHandler::close() { - assertAttached("close"); - QPID_LOG(info, "Received session.close"); - ignoring=false; - session->detach(); - session.reset(); - peerSession.closed(REPLY_SUCCESS, "ok"); +void SessionHandler::detach(const std::string& name) +{ + assertAttached("detach"); + localSuspend(); + peerSession.detached(name, 0); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } -void SessionHandler::closed(uint16_t replyCode, const string& replyText) { - QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); +void SessionHandler::detached(const std::string& name, uint8_t code) +{ ignoring=false; session->detach(); session.reset(); -} - -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); + if (code) { + //no error + } else { + //error occured + QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } } -void SessionHandler::suspend() { - assertAttached("suspend"); - localSuspend(); - peerSession.detached(); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void SessionHandler::ack(uint32_t cumulativeSeenMark, - const SequenceNumberSet& /*seenFrameSet*/) +void SessionHandler::requestTimeout(uint32_t t) { - assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { - session->receivedAck(cumulativeSeenMark); - framing::SessionState::Replay replay=session->replay(); - std::for_each(replay.begin(), replay.end(), - boost::bind(&SessionHandler::handleOut, this, _1)); - } - else - session->receivedAck(cumulativeSeenMark); + session->setTimeout(t); + //proxy.timeout(t); } -void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.high-water-mark"); +void SessionHandler::timeout(uint32_t) +{ + //not sure what we need to do on the server for this... } -void SessionHandler::solicitAck() { - assertAttached("solicit-ack"); - peerSession.ack(session->sendingAck(), SequenceNumberSet()); +void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + session->next = id; } -void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } } -void SessionHandler::detached() +void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) { - connection.broker.getSessionManager().suspend(session); - session.reset(); + //don't really care too much about this yet } - -ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) { - assertAttached("complete"); - session->complete(cumulative, range); + session->complete(commands); + if (timelyReply) { + peerSession.knownCompleted(session->knownCompleted); + session->knownCompleted.clear(); + } } -void SessionHandler::flush() +void SessionHandler::knownCompleted(const framing::SequenceSet& commands) { - assertAttached("flush"); - session->flush(); + session->completed.remove(commands); } -void SessionHandler::sync() + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - assertAttached("sync"); - session->sync(); + if (expected) { + peerSession.expected(SequenceSet(session->next), Array()); + } + if (confirmed) { + peerSession.confirmed(session->completed, Array()); + } + if (completed) { + peerSession.completed(session->completed, true); + } } -void SessionHandler::noop() + +void SessionHandler::sendCompletion() { - assertAttached("noop"); - session->noop(); + peerSession.completed(session->completed, true); } -void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void SessionHandler::gap(const framing::SequenceSet& /*commands*/) { - //never actually sent by client at present + throw NotImplementedException("gap not yet supported"); } - + }} // namespace qpid::broker |