diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java | 87 |
1 files changed, 78 insertions, 9 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 5e4336f988..6146f029b2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.transport.util.Logger; /** @@ -29,10 +29,12 @@ import org.apache.qpid.transport.network.Frame; * @author Rafael H. Schloming */ -public abstract class SessionDelegate +public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { + private static final Logger log = Logger.get(SessionDelegate.class); + public void init(Session ssn, ProtocolHeader hdr) { } public void control(Session ssn, Method method) { @@ -42,7 +44,7 @@ public abstract class SessionDelegate public void command(Session ssn, Method method) { ssn.identify(method); method.dispatch(ssn, this); - if (!method.hasPayloadSegment()) + if (!method.hasPayload()) { ssn.processed(method); } @@ -50,14 +52,21 @@ public abstract class SessionDelegate public void error(Session ssn, ProtocolError error) { } - @Override public void executionResult(Session ssn, ExecutionResult result) + public void handle(Session ssn, Method method) { - ssn.result(result.getCommandId(), result.getValue()); + log.warn("UNHANDLED: [%s] %s", ssn, method); } - @Override public void executionException(Session ssn, ExecutionException exc) + @Override public void sessionAttached(Session ssn, SessionAttached atc) { - ssn.addException(exc); + ssn.setState(Session.State.OPEN); + } + + @Override public void sessionTimeout(Session ssn, SessionTimeout t) + { + // XXX: we ignore this right now, we should uncomment this + // when full session resume is supported: + // ssn.setExpiry(t.getTimeout()); } @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) @@ -108,13 +117,13 @@ public abstract class SessionDelegate } if (flush.getExpected()) { - throw new Error("not implemented"); + ssn.flushExpected(); } } @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.commandsIn = scp.getCommandId(); + ssn.commandPoint(scp.getCommandId()); } @Override public void executionSync(Session ssn, ExecutionSync sync) @@ -122,4 +131,64 @@ public abstract class SessionDelegate ssn.syncPoint(); } + @Override public void executionResult(Session ssn, ExecutionResult result) + { + ssn.result(result.getCommandId(), result.getValue()); + } + + @Override public void executionException(Session ssn, ExecutionException exc) + { + ssn.setException(exc); + } + + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ssn.getSessionListener().message(ssn, xfr); + } + + @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm) + { + if ("".equals(sfm.getDestination()) && + MessageFlowMode.CREDIT.equals(sfm.getFlowMode())) + { + ssn.setFlowControl(true); + } + else + { + super.messageSetFlowMode(ssn, sfm); + } + } + + @Override public void messageFlow(Session ssn, MessageFlow flow) + { + if ("".equals(flow.getDestination()) && + MessageCreditUnit.MESSAGE.equals(flow.getUnit())) + { + ssn.addCredit((int) flow.getValue()); + } + else + { + super.messageFlow(ssn, flow); + } + } + + @Override public void messageStop(Session ssn, MessageStop stop) + { + if ("".equals(stop.getDestination())) + { + ssn.drainCredit(); + } + else + { + super.messageStop(ssn, stop); + } + } + + public void closed(Session session) + { + } + + public void detached(Session session) + { + } } |