summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java212
1 files changed, 212 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
new file mode 100644
index 0000000000..9a02961dc4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.util.Logger;
+
+
+/**
+ * SessionDelegate
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class SessionDelegate
+ extends MethodDelegate<Session>
+ implements ProtocolDelegate<Session>
+{
+ protected static final Logger log = Logger.get(SessionDelegate.class);
+
+ public void init(Session ssn, ProtocolHeader hdr)
+ {
+ log.warn("INIT: [%s] %s", ssn, hdr);
+ }
+
+ public void control(Session ssn, Method method)
+ {
+ method.dispatch(ssn, this);
+ }
+
+ public void command(Session ssn, Method method) {
+ ssn.identify(method);
+ method.dispatch(ssn, this);
+ if (!method.hasPayload())
+ {
+ ssn.processed(method);
+ }
+ }
+
+ public void error(Session ssn, ProtocolError error)
+ {
+ log.warn("ERROR: [%s] %s", ssn, error);
+ }
+
+ public void handle(Session ssn, Method method)
+ {
+ log.warn("UNHANDLED: [%s] %s", ssn, method);
+ }
+
+ @Override public void sessionRequestTimeout(Session ssn, SessionRequestTimeout t)
+ {
+ if (t.getTimeout() == 0)
+ {
+ ssn.setClose(true);
+ }
+ ssn.sessionTimeout(0); // Always report back an expiry of 0 until it is implemented
+ }
+
+ @Override public void sessionAttached(Session ssn, SessionAttached atc)
+ {
+ ssn.setState(Session.State.OPEN);
+ }
+
+ @Override public void sessionTimeout(Session ssn, SessionTimeout t)
+ {
+ // Setting of expiry is not implemented
+ }
+
+ @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
+ {
+ RangeSet ranges = cmp.getCommands();
+ RangeSet known = null;
+ if (cmp.getTimelyReply())
+ {
+ known = new RangeSet();
+ }
+
+ if (ranges != null)
+ {
+ for (Range range : ranges)
+ {
+ boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+ if (advanced && known != null)
+ {
+ known.add(range);
+ }
+ }
+ }
+
+ if (known != null)
+ {
+ ssn.sessionKnownCompleted(known);
+ }
+ }
+
+ @Override public void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
+ {
+ RangeSet kc = kcmp.getCommands();
+ if (kc != null)
+ {
+ ssn.knownComplete(kc);
+ }
+ }
+
+ @Override public void sessionFlush(Session ssn, SessionFlush flush)
+ {
+ if (flush.getCompleted())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getConfirmed())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getExpected())
+ {
+ ssn.flushExpected();
+ }
+ }
+
+ @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
+ {
+ ssn.commandPoint(scp.getCommandId());
+ }
+
+ @Override public void executionSync(Session ssn, ExecutionSync sync)
+ {
+ ssn.syncPoint();
+ }
+
+ @Override public void executionResult(Session ssn, ExecutionResult result)
+ {
+ ssn.result(result.getCommandId(), result.getValue());
+ }
+
+ @Override public void executionException(Session ssn, ExecutionException exc)
+ {
+ ssn.setException(exc);
+ ssn.getSessionListener().exception(ssn, new SessionException(exc));
+ ssn.closed();
+ }
+
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ssn.getSessionListener().message(ssn, xfr);
+ }
+
+ @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+ {
+ if ("".equals(sfm.getDestination()) &&
+ MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
+ {
+ ssn.setFlowControl(true);
+ }
+ else
+ {
+ super.messageSetFlowMode(ssn, sfm);
+ }
+ }
+
+ @Override public void messageFlow(Session ssn, MessageFlow flow)
+ {
+ if ("".equals(flow.getDestination()) &&
+ MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
+ {
+ ssn.addCredit((int) flow.getValue());
+ }
+ else
+ {
+ super.messageFlow(ssn, flow);
+ }
+ }
+
+ @Override public void messageStop(Session ssn, MessageStop stop)
+ {
+ if ("".equals(stop.getDestination()))
+ {
+ ssn.drainCredit();
+ }
+ else
+ {
+ super.messageStop(ssn, stop);
+ }
+ }
+
+ public void closed(Session session)
+ {
+ log.debug("CLOSED: [%s]", session);
+ }
+
+ public void detached(Session session)
+ {
+ log.debug("DETACHED: [%s]", session);
+ }
+}