summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java87
1 files changed, 78 insertions, 9 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 5e4336f988..6146f029b2 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
/**
@@ -29,10 +29,12 @@ import org.apache.qpid.transport.network.Frame;
* @author Rafael H. Schloming
*/
-public abstract class SessionDelegate
+public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
+ private static final Logger log = Logger.get(SessionDelegate.class);
+
public void init(Session ssn, ProtocolHeader hdr) { }
public void control(Session ssn, Method method) {
@@ -42,7 +44,7 @@ public abstract class SessionDelegate
public void command(Session ssn, Method method) {
ssn.identify(method);
method.dispatch(ssn, this);
- if (!method.hasPayloadSegment())
+ if (!method.hasPayload())
{
ssn.processed(method);
}
@@ -50,14 +52,21 @@ public abstract class SessionDelegate
public void error(Session ssn, ProtocolError error) { }
- @Override public void executionResult(Session ssn, ExecutionResult result)
+ public void handle(Session ssn, Method method)
{
- ssn.result(result.getCommandId(), result.getValue());
+ log.warn("UNHANDLED: [%s] %s", ssn, method);
}
- @Override public void executionException(Session ssn, ExecutionException exc)
+ @Override public void sessionAttached(Session ssn, SessionAttached atc)
{
- ssn.addException(exc);
+ ssn.setState(Session.State.OPEN);
+ }
+
+ @Override public void sessionTimeout(Session ssn, SessionTimeout t)
+ {
+ // XXX: we ignore this right now, we should uncomment this
+ // when full session resume is supported:
+ // ssn.setExpiry(t.getTimeout());
}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
@@ -108,13 +117,13 @@ public abstract class SessionDelegate
}
if (flush.getExpected())
{
- throw new Error("not implemented");
+ ssn.flushExpected();
}
}
@Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
{
- ssn.commandsIn = scp.getCommandId();
+ ssn.commandPoint(scp.getCommandId());
}
@Override public void executionSync(Session ssn, ExecutionSync sync)
@@ -122,4 +131,64 @@ public abstract class SessionDelegate
ssn.syncPoint();
}
+ @Override public void executionResult(Session ssn, ExecutionResult result)
+ {
+ ssn.result(result.getCommandId(), result.getValue());
+ }
+
+ @Override public void executionException(Session ssn, ExecutionException exc)
+ {
+ ssn.setException(exc);
+ }
+
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ssn.getSessionListener().message(ssn, xfr);
+ }
+
+ @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+ {
+ if ("".equals(sfm.getDestination()) &&
+ MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
+ {
+ ssn.setFlowControl(true);
+ }
+ else
+ {
+ super.messageSetFlowMode(ssn, sfm);
+ }
+ }
+
+ @Override public void messageFlow(Session ssn, MessageFlow flow)
+ {
+ if ("".equals(flow.getDestination()) &&
+ MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
+ {
+ ssn.addCredit((int) flow.getValue());
+ }
+ else
+ {
+ super.messageFlow(ssn, flow);
+ }
+ }
+
+ @Override public void messageStop(Session ssn, MessageStop stop)
+ {
+ if ("".equals(stop.getDestination()))
+ {
+ ssn.drainCredit();
+ }
+ else
+ {
+ super.messageStop(ssn, stop);
+ }
+ }
+
+ public void closed(Session session)
+ {
+ }
+
+ public void detached(Session session)
+ {
+ }
}