diff options
16 files changed, 303 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 4903991d7d..33b5586409 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -24,7 +24,7 @@ import org.apache.qpidity.api.Message; import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
/**
* <p>A session is associated with a connection.
@@ -290,7 +290,7 @@ public interface Session * @param range Range of acknowledged messages.
* @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(Range<Long>... range) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges) throws QpidException;
/**
* Reject ranges of acquired messages.
@@ -300,7 +300,7 @@ public interface Session * @param range Range of rejected messages.
* @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(Range<Long>... range) throws QpidException;
+ public void messageReject(RangeSet ranges) throws QpidException;
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -314,7 +314,7 @@ public interface Session * @return Ranges of explicitly acquired messages.
* @throws QpidException If this message cannot be acquired dus to some error
*/
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException;
+ public RangeSet messageAcquire(RangeSet range) throws QpidException;
/**
* Give up responsibility for processing ranges of messages.
@@ -323,7 +323,7 @@ public interface Session * @param range Ranges of messages to be released.
* @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(Range<Long>... range) throws QpidException;
+ public void messageRelease(RangeSet range) throws QpidException;
// -----------------------------------------------
// Local transaction methods
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index bc02a7c18d..f6d0cfaefd 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -113,25 +113,25 @@ public class ClientSession implements org.apache.qpidity.client.Session } - public void messageAcknowledge(Range<Long>... range) throws QpidException + public void messageAcknowledge(RangeSet ranges) throws QpidException { // TODO } - public void messageReject(Range<Long>... range) throws QpidException + public void messageReject(RangeSet ranges) throws QpidException { // TODO } - public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException + public RangeSet messageAcquire(RangeSet ranges) throws QpidException { // TODO return null; } - public void messageRelease(Range<Long>... range) throws QpidException + public void messageRelease(RangeSet ranges) throws QpidException { // TODO diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index d88a177001..7fab5adfea 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -19,7 +19,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.impl.MessagePartListenerAdapter; -import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; @@ -568,8 +568,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageRelease(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageRelease(ranges); } } @@ -585,12 +586,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer boolean result = false; if (!_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); - Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range); - if (rangeResult.length > 0) + RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges); + if (acquired.size() > 0) { - result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0; + result = acquired.iterator().next().getLower() == message.getMessageID(); } } return result; @@ -606,8 +608,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (!_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageAcknowledge(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(ranges); } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 5ab8482635..8b224500e9 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; @@ -443,10 +444,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // release this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageRelease(range); + getQpidSession().messageRelease(ranges); } catch (QpidException e) { @@ -982,10 +984,11 @@ public class SessionImpl implements Session else { // acknowledge this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { @@ -1016,10 +1019,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // acknowledge this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { diff --git a/java/common/generate b/java/common/generate index ea805b62f2..f0a91e9d50 100755 --- a/java/common/generate +++ b/java/common/generate @@ -51,7 +51,7 @@ TYPES = { "timestamp": "long", "content": "String", "uuid": "UUID", - "rfc1982-long-set": "Range<Long>[]", + "rfc1982-long-set": "RangeSet", "long-struct": "Struct" } diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java index 9db419537c..ca728812bd 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java @@ -143,7 +143,7 @@ abstract class AbstractDecoder implements Decoder return null; } - public Range<Long>[] readRfc1982LongSet() + public RangeSet readRfc1982LongSet() { int count = readShort()/8; if (count == 0) @@ -152,10 +152,10 @@ abstract class AbstractDecoder implements Decoder } else { - Range<Long>[] ranges = new Range[count]; + RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges[i] = new Range<Long>(readLong(), readLong()); + ranges.add(readLong(), readLong()); } return ranges; } diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java index f898d759d5..ecd274615b 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java @@ -150,7 +150,7 @@ abstract class AbstractEncoder implements Encoder //throw new Error("TODO"); } - public void writeRfc1982LongSet(Range<Long>[] ranges) + public void writeRfc1982LongSet(RangeSet ranges) { if (ranges == null) { @@ -158,8 +158,8 @@ abstract class AbstractEncoder implements Encoder } else { - writeShort(ranges.length * 8); - for (Range<Long> range : ranges) + writeShort(ranges.size() * 8); + for (Range range : ranges) { writeLong(range.getLower()); writeLong(range.getUpper()); diff --git a/java/common/src/main/java/org/apache/qpidity/Decoder.java b/java/common/src/main/java/org/apache/qpidity/Decoder.java index 0a869baab7..13c3d0b7b8 100644 --- a/java/common/src/main/java/org/apache/qpidity/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/Decoder.java @@ -44,7 +44,7 @@ public interface Decoder String readLongstr(); Map<String,?> readTable(); - Range<Long>[] readRfc1982LongSet(); + RangeSet readRfc1982LongSet(); UUID readUuid(); String readContent(); diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java index c25b96e462..990cef7081 100644 --- a/java/common/src/main/java/org/apache/qpidity/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java @@ -44,7 +44,7 @@ public interface Encoder void writeLongstr(String s); void writeTable(Map<String,?> table); - void writeRfc1982LongSet(Range<Long>[] ranges); + void writeRfc1982LongSet(RangeSet ranges); void writeUuid(UUID uuid); void writeContent(String c); diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/Range.java index 4766ecb471..9da7112a6d 100644 --- a/java/common/src/main/java/org/apache/qpidity/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/Range.java @@ -20,6 +20,8 @@ */ package org.apache.qpidity; +import static java.lang.Math.*; + /** * Range @@ -27,25 +29,57 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -public class Range<C extends Comparable> +public class Range { - private final C lower; - private final C upper; + private final long lower; + private final long upper; - public Range(C lower, C upper) + public Range(long lower, long upper) { this.lower = lower; this.upper = upper; } - public C getLower() + public long getLower() { return lower; } - public C getUpper() + public long getUpper() { return upper; } + public boolean includes(long value) + { + return lower <= value && value <= upper; + } + + public boolean includes(Range range) + { + return includes(range.lower) && includes(range.upper); + } + + public boolean intersects(Range range) + { + return (includes(range.lower) || includes(range.upper) || + range.includes(lower) || range.includes(upper)); + } + + public boolean touches(Range range) + { + return (includes(range.upper + 1) || includes(range.lower - 1) || + range.includes(upper + 1) || range.includes(lower - 1)); + } + + public Range span(Range range) + { + return new Range(min(lower, range.lower), max(upper, range.upper)); + } + + public String toString() + { + return "[" + lower + ", " + upper + "]"; + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java new file mode 100644 index 0000000000..297df7c0c9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.util.Collection; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.LinkedList; + + +/** + * RangeSet + * + * @author Rafael H. Schloming + */ + +public class RangeSet implements Iterable<Range> +{ + + private LinkedList<Range> ranges = new LinkedList<Range>(); + + public int size() + { + return ranges.size(); + } + + public Iterator<Range> iterator() + { + return ranges.iterator(); + } + + public void add(Range range) + { + ListIterator<Range> it = ranges.listIterator(); + + while (it.hasNext()) + { + Range next = it.next(); + if (range.touches(next)) + { + it.remove(); + range = range.span(next); + } + else if (range.getUpper() < next.getLower()) + { + it.previous(); + it.add(range); + return; + } + } + + it.add(range); + } + + public void add(long lower, long upper) + { + add(new Range(lower, upper)); + } + + public void add(long value) + { + add(value, value); + } + + public void clear() + { + ranges.clear(); + } + + public String toString() + { + return ranges.toString(); + } + + public static final void main(String[] args) + { + RangeSet ranges = new RangeSet(); + ranges.add(5, 10); + System.out.println(ranges); + ranges.add(15, 20); + System.out.println(ranges); + ranges.add(23, 25); + System.out.println(ranges); + ranges.add(12, 14); + System.out.println(ranges); + ranges.add(0, 1); + System.out.println(ranges); + ranges.add(3, 11); + System.out.println(ranges); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index b3116570e8..0c94c6df4e 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -37,11 +37,15 @@ public class Session extends Invoker // channel may be null Channel channel; - // outgoing command count - private long commandsOut = 0; + // XXX: incoming command count not used // incoming command count private long commandsIn = 0; + // completed incoming commands + private final RangeSet processed = new RangeSet(); + + // outgoing command count + private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; @@ -55,6 +59,31 @@ public class Session extends Invoker return commandsIn; } + public RangeSet getProcessed() + { + return processed; + } + + public void processed(long command) + { + processed.add(command); + } + + public void processed(long lower, long upper) + { + processed.add(lower, upper); + } + + public void processed(Range range) + { + processed.add(range); + } + + public void processed(Struct command) + { + processed(command.getId()); + } + public void attach(Channel channel) { this.channel = channel; @@ -63,15 +92,24 @@ public class Session extends Invoker public Method getCommand(long id) { - System.out.println(id + " " + commands); - return commands.get(id); + synchronized (commands) + { + return commands.get(id); + } } void complete(long lower, long upper) { - for (long id = lower; id <= upper; id++) + synchronized (commands) { - commands.put(id, null); + for (long id = lower; id <= upper; id++) + { + commands.remove(id); + } + if (commands.isEmpty()) + { + commands.notifyAll(); + } } } @@ -85,8 +123,10 @@ public class Session extends Invoker { if (m.getEncodedTrack() == Frame.L4) { - long cmd = commandsOut++; - commands.put(cmd, m); + synchronized (commands) + { + commands.put(commandsOut++, m); + } } channel.method(m); } @@ -116,6 +156,28 @@ public class Session extends Invoker channel.end(); } + public void sync() + { + synchronized (commands) + { + if (!commands.isEmpty()) + { + executionSync(); + } + + while (!commands.isEmpty()) + { + try { + commands.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + } + protected void invoke(Method m, Handler<Struct> handler) { throw new UnsupportedOperationException(); diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index c2fbe6ba00..1275378b7e 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -36,10 +36,10 @@ public abstract class SessionDelegate extends Delegate<Session> @Override public void executionComplete(Session ssn, ExecutionComplete excmp) { - Range<Long>[] ranges = excmp.getRangedExecutionSet(); + RangeSet ranges = excmp.getRangedExecutionSet(); if (ranges != null) { - for (Range<Long> range : ranges) + for (Range range : ranges) { ssn.complete(range.getLower(), range.getUpper()); } @@ -47,4 +47,9 @@ public abstract class SessionDelegate extends Delegate<Session> ssn.complete(excmp.getCumulativeExecutionMark()); } + @Override public void executionSync(Session ssn, ExecutionSync sync) + { + ssn.executionComplete(0, ssn.getProcessed()); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 534f075dbd..8c8c2c890d 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -58,6 +58,7 @@ class ToyBroker extends SessionDelegate { queues.put(qd.getQueue(), new LinkedList()); System.out.println("declared queue: " + qd.getQueue()); + ssn.processed(qd); } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) @@ -120,6 +121,7 @@ class ToyBroker extends SessionDelegate queue.offer(m); System.out.println("queued " + m); } + ssn.processed(xfr); xfr = null; frames = null; } @@ -133,8 +135,8 @@ class ToyBroker extends SessionDelegate } else { - long id = xfr.getId(); - Range[] ranges = {new Range<Long>(id, id)}; + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); ssn.messageReject(ranges, 0, "no such destination"); } } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 1e57de3265..db03d30605 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -32,7 +32,7 @@ class ToyClient extends SessionDelegate @Override public void messageReject(Session ssn, MessageReject reject) { - for (Range<Long> range : reject.getTransfers()) + for (Range range : reject.getTransfers()) { for (long l = range.getLower(); l <= range.getUpper(); l++) { @@ -40,6 +40,7 @@ class ToyClient extends SessionDelegate ssn.getCommand((int) l)); } } + ssn.processed(reject); } public void headers(Session ssn, Struct ... headers) @@ -73,6 +74,7 @@ class ToyClient extends SessionDelegate ssn.sessionOpen(1234); ssn.queueDeclare("asdf", null, null); + ssn.sync(); ssn.messageTransfer("asdf", (short) 0, (short) 1); ssn.headers(new DeliveryProperties(), diff --git a/specs/amqp.0-10-preview.xml b/specs/amqp.0-10-preview.xml index 4e2d417c47..8b3c4e49ea 100644 --- a/specs/amqp.0-10-preview.xml +++ b/specs/amqp.0-10-preview.xml @@ -1272,6 +1272,22 @@ </doc> </domain> + <domain name="execution-header"> + <doc> + The execution header appears on commands after the class and method id, but prior to method + arguments. + </doc> + <struct size="octet" pack="octet"> + <field name="sync" domain="bit" + label="request notification of completion for a specific command"> + <doc> + Indicates that an execution.complete should be sent immediately after processing the + command. + </doc> + </field> + </struct> + </domain> + <!-- Elementary domains --> <domain name="bit" type="bit" label="single bit" /> <domain name="octet" type="octet" label="single octet" /> @@ -7063,6 +7079,19 @@ <field name="data" domain="long-struct"/> </method> + <!-- - Method: execution.sync - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="sync" index="50" label="request notification of completion for issued commands"> + <doc> + Requests notification (via execution.complete) when all commands issued prior to the sync + control have been processed. If the recipient of this control has already notified the + sender that said commands are complete, it may safely ignore the control. + </doc> + + <chassis name="server" implement="MUST"/> + <chassis name="client" implement="MUST"/> + </method> + </class> </amqp> |