diff options
3 files changed, 41 insertions, 4 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java index 46c46a800a..98a5c8128e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java @@ -208,7 +208,7 @@ abstract class AbstractDecoder implements Decoder { String key = readShortstr(); byte code = get(); - Type t = Type.get(code); + Type t = getType(code); Object value = read(t); result.put(key, value); } @@ -223,7 +223,7 @@ abstract class AbstractDecoder implements Decoder while (count < start + size) { byte code = get(); - Type t = Type.get(code); + Type t = getType(code); Object value = read(t); result.add(value); } @@ -234,7 +234,7 @@ abstract class AbstractDecoder implements Decoder { long size = readLong(); byte code = get(); - Type t = Type.get(code); + Type t = getType(code); long count = readLong(); List<Object> result = new ArrayList<Object>(); @@ -246,6 +246,19 @@ abstract class AbstractDecoder implements Decoder return result; } + private Type getType(byte code) + { + Type type = Type.get(code); + if (type == null) + { + throw new IllegalArgumentException("unknown code: " + code); + } + else + { + return type; + } + } + private long readSize(Type t) { if (t.fixed) diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java index 88c8b18e0e..ec5cb74e5b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -48,4 +48,14 @@ class ChannelDelegate extends MethodDelegate<Channel> attached.getDetachedLifetime()); } + public @Override void sessionClosed(Channel channel, SessionClosed closed) + { + System.out.println("Session closed: [" + closed.getReplyCode() + "]" + + closed.getReplyText()); + channel.getSession().closed(); + // XXX: should we remove the channel from the connection? It + // could have an external reference to it. Maybe we need a + // weak hash map in connection. + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 3305d04489..9c8b525deb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -82,6 +82,7 @@ public class Session extends Invoker public void processed(Method command) { + System.out.printf("processed[%d]: %s\n", command.getId(), command.getClass()); processed(command.getId()); } @@ -92,6 +93,7 @@ public class Session extends Invoker public void processed(long lower, long upper) { + processed(new Range(lower, upper)); } @@ -111,12 +113,24 @@ public class Session extends Invoker void flushProcessed() { + long mark = -1; + boolean first = true; + RangeSet rest = new RangeSet(); for (Range r: processed) { System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + if (first) + { + first = false; + mark = r.getUpper(); + } + else + { + rest.add(r); + } } System.out.println("Notifying peer with execution complete"); - executionComplete(0, processed); + executionComplete(mark, rest); } void syncPoint() |