summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java53
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ContentHandler.java10
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Method.java20
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDecoder.java59
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java28
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodHandler.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MinaHandler.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java13
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Struct.java13
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java2
13 files changed, 166 insertions, 48 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index 2b25d7287b..8cd07f002a 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.ArrayList;
import static org.apache.qpidity.Frame.*;
+import static org.apache.qpidity.Functions.*;
/**
@@ -134,6 +135,8 @@ class Channel extends Invoker implements Handler<Frame>
{
method = m;
}
+
+ System.out.println("sent " + m);
}
public void headers(Struct ... headers)
@@ -157,6 +160,7 @@ class Channel extends Invoker implements Handler<Frame>
{
enc.writeLongStruct(hdr);
enc.flush();
+ System.out.println("sent " + hdr);
}
}
@@ -189,6 +193,7 @@ class Channel extends Invoker implements Handler<Frame>
for (ByteBuffer buf : data)
{
enc.put(buf);
+ System.out.println("sent " + str(buf));
}
enc.flush();
data = null;
diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
new file mode 100644
index 0000000000..05b252c26e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * CommandDispatcher
+ *
+ * @author Rafael H. Schloming
+ */
+
+class CommandDispatcher implements Handler<Event<Session,Method>>
+{
+
+ private final Delegate<Session> delegate;
+
+ public CommandDispatcher(Delegate<Session> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public void handle(Event<Session,Method> event)
+ {
+ Session ssn = event.context;
+ Method method = event.target;
+ method.setId(ssn.nextCommandId());
+ System.out.println("delegating " + method + "[" + method.getId() + "] to " + delegate);
+ method.delegate(ssn, delegate);
+ if (!method.hasPayload())
+ {
+ ssn.processed(method);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
index 435e377c2f..b54e28c85e 100644
--- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
@@ -34,11 +34,11 @@ class ContentHandler extends TypeSwitch<Session>
public ContentHandler(byte major, byte minor, SessionDelegate delegate)
{
- MethodDispatcher<Session> md =
- new MethodDispatcher<Session>(major, minor, delegate);
- map(Frame.METHOD, new SegmentAssembler<Session>(md));
- map(Frame.HEADER, new SegmentAssembler<Session>
- (new HeaderHandler(major, minor, delegate)));
+ CommandDispatcher disp = new CommandDispatcher(delegate);
+ MethodDecoder<Session> dec = new MethodDecoder<Session>(major, minor, disp);
+ HeaderHandler hh = new HeaderHandler(major, minor, delegate);
+ map(Frame.METHOD, new SegmentAssembler<Session>(dec));
+ map(Frame.HEADER, new SegmentAssembler<Session>(hh));
map(Frame.BODY, new BodyHandler(delegate));
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java
index fb269dfb7b..43865d36aa 100644
--- a/java/common/src/main/java/org/apache/qpidity/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/Method.java
@@ -30,6 +30,26 @@ package org.apache.qpidity;
public abstract class Method extends Struct
{
+ public static final Method create(int type)
+ {
+ // XXX: should generate separate factories for separate
+ // namespaces
+ return (Method) Struct.create(type);
+ }
+
+ // XXX: command subclass?
+ private long id;
+
+ public final long getId()
+ {
+ return id;
+ }
+
+ void setId(long id)
+ {
+ this.id = id;
+ }
+
public abstract boolean hasPayload();
public abstract byte getEncodedTrack();
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
new file mode 100644
index 0000000000..c1cf9b888c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+import java.util.Iterator;
+
+
+/**
+ * MethodDecoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+class MethodDecoder<C> implements Handler<Event<C,Segment>>
+{
+
+ private final byte major;
+ private final byte minor;
+ private final Handler<Event<C,Method>> handler;
+
+ public MethodDecoder(byte major, byte minor, Handler<Event<C,Method>> handler)
+ {
+ this.major = major;
+ this.minor = minor;
+ this.handler = handler;
+ }
+
+ public void handle(Event<C,Segment> event)
+ {
+ System.out.println("got method segment:\n " + event.target);
+ Iterator<ByteBuffer> fragments = event.target.getFragments();
+ Decoder dec = new FragmentDecoder(major, minor, fragments);
+ int type = (int) dec.readLong();
+ Method method = Method.create(type);
+ method.read(dec, major, minor);
+ handler.handle(new Event<C,Method>(event.context, method));
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
index 6c7389b02d..911eaa0b15 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
@@ -20,10 +20,6 @@
*/
package org.apache.qpidity;
-import java.nio.ByteBuffer;
-
-import java.util.Iterator;
-
/**
* A MethodDispatcher parses and dispatches a method segment.
@@ -31,33 +27,21 @@ import java.util.Iterator;
* @author Rafael H. Schloming
*/
-class MethodDispatcher<C> implements Handler<Event<C,Segment>>
+class MethodDispatcher<C> implements Handler<Event<C,Method>>
{
- final private byte major;
- final private byte minor;
final private Delegate<C> delegate;
- // XXX: should be on session
- private int count = 0;
- public MethodDispatcher(byte major, byte minor, Delegate<C> delegate)
+ public MethodDispatcher(Delegate<C> delegate)
{
- this.major = major;
- this.minor = minor;
this.delegate = delegate;
}
- public void handle(Event<C,Segment> event)
+ public void handle(Event<C,Method> event)
{
- System.out.println("got method segment:\n " + event.target);
- Iterator<ByteBuffer> fragments = event.target.getFragments();
- Decoder dec = new FragmentDecoder(major, minor, fragments);
- int type = (int) dec.readLong();
- Struct struct = Struct.create(type);
- struct.setId(count++);
- struct.read(dec, major, minor);
- System.out.println("delegating " + struct + "[" + struct.getId() + "] to " + delegate);
- struct.delegate(event.context, delegate);
+ Method method = event.target;
+ System.out.println("delegating " + method + " to " + delegate);
+ method.delegate(event.context, delegate);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
index dd952fe2c2..86dac241a2 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
@@ -34,8 +34,9 @@ class MethodHandler<C> extends TypeSwitch<C>
public MethodHandler(byte major, byte minor, Delegate<C> delegate)
{
- MethodDispatcher md = new MethodDispatcher<C>(major, minor, delegate);
- map(Frame.METHOD, new SegmentAssembler<C>(md));
+ MethodDispatcher disp = new MethodDispatcher<C>(delegate);
+ MethodDecoder<C> dec = new MethodDecoder<C>(major, minor, disp);
+ map(Frame.METHOD, new SegmentAssembler<C>(dec));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
index 17bbe5c0a7..a40753ed91 100644
--- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
@@ -64,7 +64,7 @@ class MinaHandler implements IoHandler
public void messageSent(IoSession ssn, Object obj)
{
- System.out.println("TX: " + obj);
+ // do nothing
}
public void exceptionCaught(IoSession ssn, Throwable e)
@@ -74,7 +74,7 @@ class MinaHandler implements IoHandler
public void sessionCreated(final IoSession ssn)
{
- System.out.println("created " + ssn);
+ // do nothing
}
public void sessionOpened(final IoSession ssn)
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 0c94c6df4e..228c4cf495 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -38,7 +38,6 @@ public class Session extends Invoker
// channel may be null
Channel channel;
- // XXX: incoming command count not used
// incoming command count
private long commandsIn = 0;
// completed incoming commands
@@ -49,6 +48,11 @@ public class Session extends Invoker
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
+ public Map<Long,Method> getOutstandingCommands()
+ {
+ return commands;
+ }
+
public long getCommandsOut()
{
return commandsOut;
@@ -59,6 +63,11 @@ public class Session extends Invoker
return commandsIn;
}
+ public long nextCommandId()
+ {
+ return commandsIn++;
+ }
+
public RangeSet getProcessed()
{
return processed;
@@ -79,7 +88,7 @@ public class Session extends Invoker
processed.add(range);
}
- public void processed(Struct command)
+ public void processed(Method command)
{
processed(command.getId());
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index 1275378b7e..fd3e019367 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -45,6 +45,7 @@ public abstract class SessionDelegate extends Delegate<Session>
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
+ System.out.println("outstanding commands: " + ssn.getOutstandingCommands());
}
@Override public void executionSync(Session ssn, ExecutionSync sync)
diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/Struct.java
index 16b02a72a3..cf8a5c246f 100644
--- a/java/common/src/main/java/org/apache/qpidity/Struct.java
+++ b/java/common/src/main/java/org/apache/qpidity/Struct.java
@@ -35,19 +35,6 @@ public abstract class Struct implements Delegator, Encodable
return StructFactory.create(type);
}
- // XXX: command subclass?
- private long id;
-
- public final long getId()
- {
- return id;
- }
-
- void setId(long id)
- {
- this.id = id;
- }
-
abstract int getEncodedType();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 8c8c2c890d..4a33122d37 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -58,7 +58,6 @@ class ToyBroker extends SessionDelegate
{
queues.put(qd.getQueue(), new LinkedList());
System.out.println("declared queue: " + qd.getQueue());
- ssn.processed(qd);
}
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index db03d30605..2e27dc8574 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -40,7 +40,6 @@ class ToyClient extends SessionDelegate
ssn.getCommand((int) l));
}
}
- ssn.processed(reject);
}
public void headers(Session ssn, Struct ... headers)
@@ -85,6 +84,7 @@ class ToyClient extends SessionDelegate
ssn.messageTransfer("fdsa", (short) 0, (short) 1);
ssn.data("this should be rejected");
ssn.end();
+ ssn.sync();
}
}