diff options
13 files changed, 166 insertions, 48 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 2b25d7287b..8cd07f002a 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.ArrayList; import static org.apache.qpidity.Frame.*; +import static org.apache.qpidity.Functions.*; /** @@ -134,6 +135,8 @@ class Channel extends Invoker implements Handler<Frame> { method = m; } + + System.out.println("sent " + m); } public void headers(Struct ... headers) @@ -157,6 +160,7 @@ class Channel extends Invoker implements Handler<Frame> { enc.writeLongStruct(hdr); enc.flush(); + System.out.println("sent " + hdr); } } @@ -189,6 +193,7 @@ class Channel extends Invoker implements Handler<Frame> for (ByteBuffer buf : data) { enc.put(buf); + System.out.println("sent " + str(buf)); } enc.flush(); data = null; diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java new file mode 100644 index 0000000000..05b252c26e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * CommandDispatcher + * + * @author Rafael H. Schloming + */ + +class CommandDispatcher implements Handler<Event<Session,Method>> +{ + + private final Delegate<Session> delegate; + + public CommandDispatcher(Delegate<Session> delegate) + { + this.delegate = delegate; + } + + public void handle(Event<Session,Method> event) + { + Session ssn = event.context; + Method method = event.target; + method.setId(ssn.nextCommandId()); + System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate); + method.delegate(ssn, delegate); + if (!method.hasPayload()) + { + ssn.processed(method); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java index 435e377c2f..b54e28c85e 100644 --- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java @@ -34,11 +34,11 @@ class ContentHandler extends TypeSwitch<Session> public ContentHandler(byte major, byte minor, SessionDelegate delegate) { - MethodDispatcher<Session> md = - new MethodDispatcher<Session>(major, minor, delegate); - map(Frame.METHOD, new SegmentAssembler<Session>(md)); - map(Frame.HEADER, new SegmentAssembler<Session> - (new HeaderHandler(major, minor, delegate))); + CommandDispatcher disp = new CommandDispatcher(delegate); + MethodDecoder<Session> dec = new MethodDecoder<Session>(major, minor, disp); + HeaderHandler hh = new HeaderHandler(major, minor, delegate); + map(Frame.METHOD, new SegmentAssembler<Session>(dec)); + map(Frame.HEADER, new SegmentAssembler<Session>(hh)); map(Frame.BODY, new BodyHandler(delegate)); } diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java index fb269dfb7b..43865d36aa 100644 --- a/java/common/src/main/java/org/apache/qpidity/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/Method.java @@ -30,6 +30,26 @@ package org.apache.qpidity; public abstract class Method extends Struct { + public static final Method create(int type) + { + // XXX: should generate separate factories for separate + // namespaces + return (Method) Struct.create(type); + } + + // XXX: command subclass? + private long id; + + public final long getId() + { + return id; + } + + void setId(long id) + { + this.id = id; + } + public abstract boolean hasPayload(); public abstract byte getEncodedTrack(); diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java new file mode 100644 index 0000000000..c1cf9b888c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + +import java.util.Iterator; + + +/** + * MethodDecoder + * + * @author Rafael H. Schloming + */ + +class MethodDecoder<C> implements Handler<Event<C,Segment>> +{ + + private final byte major; + private final byte minor; + private final Handler<Event<C,Method>> handler; + + public MethodDecoder(byte major, byte minor, Handler<Event<C,Method>> handler) + { + this.major = major; + this.minor = minor; + this.handler = handler; + } + + public void handle(Event<C,Segment> event) + { + System.out.println("got method segment:\n " + event.target); + Iterator<ByteBuffer> fragments = event.target.getFragments(); + Decoder dec = new FragmentDecoder(major, minor, fragments); + int type = (int) dec.readLong(); + Method method = Method.create(type); + method.read(dec, major, minor); + handler.handle(new Event<C,Method>(event.context, method)); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index 6c7389b02d..911eaa0b15 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -20,10 +20,6 @@ */ package org.apache.qpidity; -import java.nio.ByteBuffer; - -import java.util.Iterator; - /** * A MethodDispatcher parses and dispatches a method segment. @@ -31,33 +27,21 @@ import java.util.Iterator; * @author Rafael H. Schloming */ -class MethodDispatcher<C> implements Handler<Event<C,Segment>> +class MethodDispatcher<C> implements Handler<Event<C,Method>> { - final private byte major; - final private byte minor; final private Delegate<C> delegate; - // XXX: should be on session - private int count = 0; - public MethodDispatcher(byte major, byte minor, Delegate<C> delegate) + public MethodDispatcher(Delegate<C> delegate) { - this.major = major; - this.minor = minor; this.delegate = delegate; } - public void handle(Event<C,Segment> event) + public void handle(Event<C,Method> event) { - System.out.println("got method segment:\n " + event.target); - Iterator<ByteBuffer> fragments = event.target.getFragments(); - Decoder dec = new FragmentDecoder(major, minor, fragments); - int type = (int) dec.readLong(); - Struct struct = Struct.create(type); - struct.setId(count++); - struct.read(dec, major, minor); - System.out.println("delegating " + struct + "[" + struct.getId() + "] to " + delegate); - struct.delegate(event.context, delegate); + Method method = event.target; + System.out.println("delegating " + method + " to " + delegate); + method.delegate(event.context, delegate); } } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java index dd952fe2c2..86dac241a2 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java @@ -34,8 +34,9 @@ class MethodHandler<C> extends TypeSwitch<C> public MethodHandler(byte major, byte minor, Delegate<C> delegate) { - MethodDispatcher md = new MethodDispatcher<C>(major, minor, delegate); - map(Frame.METHOD, new SegmentAssembler<C>(md)); + MethodDispatcher disp = new MethodDispatcher<C>(delegate); + MethodDecoder<C> dec = new MethodDecoder<C>(major, minor, disp); + map(Frame.METHOD, new SegmentAssembler<C>(dec)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java index 17bbe5c0a7..a40753ed91 100644 --- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java @@ -64,7 +64,7 @@ class MinaHandler implements IoHandler public void messageSent(IoSession ssn, Object obj) { - System.out.println("TX: " + obj); + // do nothing } public void exceptionCaught(IoSession ssn, Throwable e) @@ -74,7 +74,7 @@ class MinaHandler implements IoHandler public void sessionCreated(final IoSession ssn) { - System.out.println("created " + ssn); + // do nothing } public void sessionOpened(final IoSession ssn) diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 0c94c6df4e..228c4cf495 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -38,7 +38,6 @@ public class Session extends Invoker // channel may be null Channel channel; - // XXX: incoming command count not used // incoming command count private long commandsIn = 0; // completed incoming commands @@ -49,6 +48,11 @@ public class Session extends Invoker private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; + public Map<Long,Method> getOutstandingCommands() + { + return commands; + } + public long getCommandsOut() { return commandsOut; @@ -59,6 +63,11 @@ public class Session extends Invoker return commandsIn; } + public long nextCommandId() + { + return commandsIn++; + } + public RangeSet getProcessed() { return processed; @@ -79,7 +88,7 @@ public class Session extends Invoker processed.add(range); } - public void processed(Struct command) + public void processed(Method command) { processed(command.getId()); } diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 1275378b7e..fd3e019367 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -45,6 +45,7 @@ public abstract class SessionDelegate extends Delegate<Session> } } ssn.complete(excmp.getCumulativeExecutionMark()); + System.out.println("outstanding commands: " + ssn.getOutstandingCommands()); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/Struct.java index 16b02a72a3..cf8a5c246f 100644 --- a/java/common/src/main/java/org/apache/qpidity/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/Struct.java @@ -35,19 +35,6 @@ public abstract class Struct implements Delegator, Encodable return StructFactory.create(type); } - // XXX: command subclass? - private long id; - - public final long getId() - { - return id; - } - - void setId(long id) - { - this.id = id; - } - abstract int getEncodedType(); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 8c8c2c890d..4a33122d37 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -58,7 +58,6 @@ class ToyBroker extends SessionDelegate { queues.put(qd.getQueue(), new LinkedList()); System.out.println("declared queue: " + qd.getQueue()); - ssn.processed(qd); } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index db03d30605..2e27dc8574 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -40,7 +40,6 @@ class ToyClient extends SessionDelegate ssn.getCommand((int) l)); } } - ssn.processed(reject); } public void headers(Session ssn, Struct ... headers) @@ -85,6 +84,7 @@ class ToyClient extends SessionDelegate ssn.messageTransfer("fdsa", (short) 0, (short) 1); ssn.data("this should be rejected"); ssn.end(); + ssn.sync(); } } |