summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-03 14:49:06 +0000
committerGordon Sim <gsim@apache.org>2008-03-03 14:49:06 +0000
commit0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e (patch)
tree8fe7333962fbea735455340424657a540c6ef9a9 /qpid/cpp/src/qpid/broker/SessionHandler.cpp
parentc8ad468141a96e5fdf4534552fe72e84399d5d5d (diff)
downloadqpid-python-0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e.tar.gz
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema. The preview codepath (99-0) remains unaltered. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp179
1 files changed, 80 insertions, 99 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 0e3c9928d1..de96ae3f12 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) {
AMQMethodBody* m = f.getBody()->getMethod();
try {
if (!ignoring) {
- if (m &&
- (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
- invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
return;
} else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
session->handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
} else {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
@@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
session.reset();
- peerSession.closed(e.code, e.what());
+ //TODO: implement new exception handling mechanism
+ //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
- peerSession.solicitAck();
+ peerSession.flush(false, false, true);
}
void SessionHandler::assertAttached(const char* method) const {
@@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const {
<< " is already open."));
}
-void SessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
+void SessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
}
-void SessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-void SessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+//new methods:
+void SessionHandler::attach(const std::string& name, bool /*force*/)
+{
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
+ peerSession.attached(name);
}
-void SessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
+void SessionHandler::attached(const std::string& /*name*/)
+{
+ std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
}
-void SessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
+void SessionHandler::detach(const std::string& name)
+{
+ assertAttached("detach");
+ localSuspend();
+ peerSession.detached(name, 0);
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
-void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+void SessionHandler::detached(const std::string& name, uint8_t code)
+{
ignoring=false;
session->detach();
session.reset();
-}
-
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ if (code) {
+ //no error
+ } else {
+ //error occured
+ QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
}
-void SessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
+void SessionHandler::requestTimeout(uint32_t t)
{
- assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&SessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
+ session->setTimeout(t);
+ //proxy.timeout(t);
}
-void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
+void SessionHandler::timeout(uint32_t)
+{
+ //not sure what we need to do on the server for this...
}
-void SessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
+void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
+{
+ if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
+
+ session->next = id;
}
-void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
+ if (!commands.empty() || fragments.size()) {
+ throw NotImplementedException("Session resumption not yet supported");
+ }
}
-void SessionHandler::detached()
+void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
{
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ //don't really care too much about this yet
}
-
-ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
{
- assertAttached("complete");
- session->complete(cumulative, range);
+ session->complete(commands);
+ if (timelyReply) {
+ peerSession.knownCompleted(session->knownCompleted);
+ session->knownCompleted.clear();
+ }
}
-void SessionHandler::flush()
+void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
{
- assertAttached("flush");
- session->flush();
+ session->completed.remove(commands);
}
-void SessionHandler::sync()
+
+void SessionHandler::flush(bool expected, bool confirmed, bool completed)
{
- assertAttached("sync");
- session->sync();
+ if (expected) {
+ peerSession.expected(SequenceSet(session->next), Array());
+ }
+ if (confirmed) {
+ peerSession.confirmed(session->completed, Array());
+ }
+ if (completed) {
+ peerSession.completed(session->completed, true);
+ }
}
-void SessionHandler::noop()
+
+void SessionHandler::sendCompletion()
{
- assertAttached("noop");
- session->noop();
+ peerSession.completed(session->completed, true);
}
-void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
{
- //never actually sent by client at present
+ throw NotImplementedException("gap not yet supported");
}
-
+
}} // namespace qpid::broker