summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java21
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java16
-rwxr-xr-xjava/common/generate2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Decoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Encoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Range.java46
-rw-r--r--java/common/src/main/java/org/apache/qpidity/RangeSet.java110
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java78
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java4
-rw-r--r--specs/amqp.0-10-preview.xml29
16 files changed, 303 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 4903991d7d..33b5586409 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -24,7 +24,7 @@ import org.apache.qpidity.api.Message;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
/**
* <p>A session is associated with a connection.
@@ -290,7 +290,7 @@ public interface Session
* @param range Range of acknowledged messages.
* @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(Range<Long>... range) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges) throws QpidException;
/**
* Reject ranges of acquired messages.
@@ -300,7 +300,7 @@ public interface Session
* @param range Range of rejected messages.
* @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(Range<Long>... range) throws QpidException;
+ public void messageReject(RangeSet ranges) throws QpidException;
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -314,7 +314,7 @@ public interface Session
* @return Ranges of explicitly acquired messages.
* @throws QpidException If this message cannot be acquired dus to some error
*/
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException;
+ public RangeSet messageAcquire(RangeSet range) throws QpidException;
/**
* Give up responsibility for processing ranges of messages.
@@ -323,7 +323,7 @@ public interface Session
* @param range Ranges of messages to be released.
* @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(Range<Long>... range) throws QpidException;
+ public void messageRelease(RangeSet range) throws QpidException;
// -----------------------------------------------
// Local transaction methods
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
index bc02a7c18d..f6d0cfaefd 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
@@ -113,25 +113,25 @@ public class ClientSession implements org.apache.qpidity.client.Session
}
- public void messageAcknowledge(Range<Long>... range) throws QpidException
+ public void messageAcknowledge(RangeSet ranges) throws QpidException
{
// TODO
}
- public void messageReject(Range<Long>... range) throws QpidException
+ public void messageReject(RangeSet ranges) throws QpidException
{
// TODO
}
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException
+ public RangeSet messageAcquire(RangeSet ranges) throws QpidException
{
// TODO
return null;
}
- public void messageRelease(Range<Long>... range) throws QpidException
+ public void messageRelease(RangeSet ranges) throws QpidException
{
// TODO
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index d88a177001..7fab5adfea 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -19,7 +19,7 @@ package org.apache.qpidity.jms;
import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.impl.MessagePartListenerAdapter;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.filter.MessageFilter;
@@ -568,8 +568,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
- getSession().getQpidSession().messageRelease(range);
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
+ getSession().getQpidSession().messageRelease(ranges);
}
}
@@ -585,12 +586,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
boolean result = false;
if (!_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
- Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range);
- if (rangeResult.length > 0)
+ RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges);
+ if (acquired.size() > 0)
{
- result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0;
+ result = acquired.iterator().next().getLower() == message.getMessageID();
}
}
return result;
@@ -606,8 +608,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (!_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
- getSession().getQpidSession().messageAcknowledge(range);
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
+ getSession().getQpidSession().messageAcknowledge(ranges);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 5ab8482635..8b224500e9 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -443,10 +444,11 @@ public class SessionImpl implements Session
for (QpidMessage message : _unacknowledgedMessages)
{
// release this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageRelease(range);
+ getQpidSession().messageRelease(ranges);
}
catch (QpidException e)
{
@@ -982,10 +984,11 @@ public class SessionImpl implements Session
else
{
// acknowledge this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageAcknowledge(range);
+ getQpidSession().messageAcknowledge(ranges);
}
catch (QpidException e)
{
@@ -1016,10 +1019,11 @@ public class SessionImpl implements Session
for (QpidMessage message : _unacknowledgedMessages)
{
// acknowledge this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageAcknowledge(range);
+ getQpidSession().messageAcknowledge(ranges);
}
catch (QpidException e)
{
diff --git a/java/common/generate b/java/common/generate
index ea805b62f2..f0a91e9d50 100755
--- a/java/common/generate
+++ b/java/common/generate
@@ -51,7 +51,7 @@ TYPES = {
"timestamp": "long",
"content": "String",
"uuid": "UUID",
- "rfc1982-long-set": "Range<Long>[]",
+ "rfc1982-long-set": "RangeSet",
"long-struct": "Struct"
}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
index 9db419537c..ca728812bd 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
@@ -143,7 +143,7 @@ abstract class AbstractDecoder implements Decoder
return null;
}
- public Range<Long>[] readRfc1982LongSet()
+ public RangeSet readRfc1982LongSet()
{
int count = readShort()/8;
if (count == 0)
@@ -152,10 +152,10 @@ abstract class AbstractDecoder implements Decoder
}
else
{
- Range<Long>[] ranges = new Range[count];
+ RangeSet ranges = new RangeSet();
for (int i = 0; i < count; i++)
{
- ranges[i] = new Range<Long>(readLong(), readLong());
+ ranges.add(readLong(), readLong());
}
return ranges;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
index f898d759d5..ecd274615b 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
@@ -150,7 +150,7 @@ abstract class AbstractEncoder implements Encoder
//throw new Error("TODO");
}
- public void writeRfc1982LongSet(Range<Long>[] ranges)
+ public void writeRfc1982LongSet(RangeSet ranges)
{
if (ranges == null)
{
@@ -158,8 +158,8 @@ abstract class AbstractEncoder implements Encoder
}
else
{
- writeShort(ranges.length * 8);
- for (Range<Long> range : ranges)
+ writeShort(ranges.size() * 8);
+ for (Range range : ranges)
{
writeLong(range.getLower());
writeLong(range.getUpper());
diff --git a/java/common/src/main/java/org/apache/qpidity/Decoder.java b/java/common/src/main/java/org/apache/qpidity/Decoder.java
index 0a869baab7..13c3d0b7b8 100644
--- a/java/common/src/main/java/org/apache/qpidity/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/Decoder.java
@@ -44,7 +44,7 @@ public interface Decoder
String readLongstr();
Map<String,?> readTable();
- Range<Long>[] readRfc1982LongSet();
+ RangeSet readRfc1982LongSet();
UUID readUuid();
String readContent();
diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java
index c25b96e462..990cef7081 100644
--- a/java/common/src/main/java/org/apache/qpidity/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java
@@ -44,7 +44,7 @@ public interface Encoder
void writeLongstr(String s);
void writeTable(Map<String,?> table);
- void writeRfc1982LongSet(Range<Long>[] ranges);
+ void writeRfc1982LongSet(RangeSet ranges);
void writeUuid(UUID uuid);
void writeContent(String c);
diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/Range.java
index 4766ecb471..9da7112a6d 100644
--- a/java/common/src/main/java/org/apache/qpidity/Range.java
+++ b/java/common/src/main/java/org/apache/qpidity/Range.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import static java.lang.Math.*;
+
/**
* Range
@@ -27,25 +29,57 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-public class Range<C extends Comparable>
+public class Range
{
- private final C lower;
- private final C upper;
+ private final long lower;
+ private final long upper;
- public Range(C lower, C upper)
+ public Range(long lower, long upper)
{
this.lower = lower;
this.upper = upper;
}
- public C getLower()
+ public long getLower()
{
return lower;
}
- public C getUpper()
+ public long getUpper()
{
return upper;
}
+ public boolean includes(long value)
+ {
+ return lower <= value && value <= upper;
+ }
+
+ public boolean includes(Range range)
+ {
+ return includes(range.lower) && includes(range.upper);
+ }
+
+ public boolean intersects(Range range)
+ {
+ return (includes(range.lower) || includes(range.upper) ||
+ range.includes(lower) || range.includes(upper));
+ }
+
+ public boolean touches(Range range)
+ {
+ return (includes(range.upper + 1) || includes(range.lower - 1) ||
+ range.includes(upper + 1) || range.includes(lower - 1));
+ }
+
+ public Range span(Range range)
+ {
+ return new Range(min(lower, range.lower), max(upper, range.upper));
+ }
+
+ public String toString()
+ {
+ return "[" + lower + ", " + upper + "]";
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
new file mode 100644
index 0000000000..297df7c0c9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.LinkedList;
+
+
+/**
+ * RangeSet
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class RangeSet implements Iterable<Range>
+{
+
+ private LinkedList<Range> ranges = new LinkedList<Range>();
+
+ public int size()
+ {
+ return ranges.size();
+ }
+
+ public Iterator<Range> iterator()
+ {
+ return ranges.iterator();
+ }
+
+ public void add(Range range)
+ {
+ ListIterator<Range> it = ranges.listIterator();
+
+ while (it.hasNext())
+ {
+ Range next = it.next();
+ if (range.touches(next))
+ {
+ it.remove();
+ range = range.span(next);
+ }
+ else if (range.getUpper() < next.getLower())
+ {
+ it.previous();
+ it.add(range);
+ return;
+ }
+ }
+
+ it.add(range);
+ }
+
+ public void add(long lower, long upper)
+ {
+ add(new Range(lower, upper));
+ }
+
+ public void add(long value)
+ {
+ add(value, value);
+ }
+
+ public void clear()
+ {
+ ranges.clear();
+ }
+
+ public String toString()
+ {
+ return ranges.toString();
+ }
+
+ public static final void main(String[] args)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(5, 10);
+ System.out.println(ranges);
+ ranges.add(15, 20);
+ System.out.println(ranges);
+ ranges.add(23, 25);
+ System.out.println(ranges);
+ ranges.add(12, 14);
+ System.out.println(ranges);
+ ranges.add(0, 1);
+ System.out.println(ranges);
+ ranges.add(3, 11);
+ System.out.println(ranges);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index b3116570e8..0c94c6df4e 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -37,11 +37,15 @@ public class Session extends Invoker
// channel may be null
Channel channel;
- // outgoing command count
- private long commandsOut = 0;
+
// XXX: incoming command count not used
// incoming command count
private long commandsIn = 0;
+ // completed incoming commands
+ private final RangeSet processed = new RangeSet();
+
+ // outgoing command count
+ private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
@@ -55,6 +59,31 @@ public class Session extends Invoker
return commandsIn;
}
+ public RangeSet getProcessed()
+ {
+ return processed;
+ }
+
+ public void processed(long command)
+ {
+ processed.add(command);
+ }
+
+ public void processed(long lower, long upper)
+ {
+ processed.add(lower, upper);
+ }
+
+ public void processed(Range range)
+ {
+ processed.add(range);
+ }
+
+ public void processed(Struct command)
+ {
+ processed(command.getId());
+ }
+
public void attach(Channel channel)
{
this.channel = channel;
@@ -63,15 +92,24 @@ public class Session extends Invoker
public Method getCommand(long id)
{
- System.out.println(id + " " + commands);
- return commands.get(id);
+ synchronized (commands)
+ {
+ return commands.get(id);
+ }
}
void complete(long lower, long upper)
{
- for (long id = lower; id <= upper; id++)
+ synchronized (commands)
{
- commands.put(id, null);
+ for (long id = lower; id <= upper; id++)
+ {
+ commands.remove(id);
+ }
+ if (commands.isEmpty())
+ {
+ commands.notifyAll();
+ }
}
}
@@ -85,8 +123,10 @@ public class Session extends Invoker
{
if (m.getEncodedTrack() == Frame.L4)
{
- long cmd = commandsOut++;
- commands.put(cmd, m);
+ synchronized (commands)
+ {
+ commands.put(commandsOut++, m);
+ }
}
channel.method(m);
}
@@ -116,6 +156,28 @@ public class Session extends Invoker
channel.end();
}
+ public void sync()
+ {
+ synchronized (commands)
+ {
+ if (!commands.isEmpty())
+ {
+ executionSync();
+ }
+
+ while (!commands.isEmpty())
+ {
+ try {
+ commands.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
protected void invoke(Method m, Handler<Struct> handler)
{
throw new UnsupportedOperationException();
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index c2fbe6ba00..1275378b7e 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -36,10 +36,10 @@ public abstract class SessionDelegate extends Delegate<Session>
@Override public void executionComplete(Session ssn, ExecutionComplete excmp)
{
- Range<Long>[] ranges = excmp.getRangedExecutionSet();
+ RangeSet ranges = excmp.getRangedExecutionSet();
if (ranges != null)
{
- for (Range<Long> range : ranges)
+ for (Range range : ranges)
{
ssn.complete(range.getLower(), range.getUpper());
}
@@ -47,4 +47,9 @@ public abstract class SessionDelegate extends Delegate<Session>
ssn.complete(excmp.getCumulativeExecutionMark());
}
+ @Override public void executionSync(Session ssn, ExecutionSync sync)
+ {
+ ssn.executionComplete(0, ssn.getProcessed());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 534f075dbd..8c8c2c890d 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -58,6 +58,7 @@ class ToyBroker extends SessionDelegate
{
queues.put(qd.getQueue(), new LinkedList());
System.out.println("declared queue: " + qd.getQueue());
+ ssn.processed(qd);
}
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
@@ -120,6 +121,7 @@ class ToyBroker extends SessionDelegate
queue.offer(m);
System.out.println("queued " + m);
}
+ ssn.processed(xfr);
xfr = null;
frames = null;
}
@@ -133,8 +135,8 @@ class ToyBroker extends SessionDelegate
}
else
{
- long id = xfr.getId();
- Range[] ranges = {new Range<Long>(id, id)};
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
ssn.messageReject(ranges, 0, "no such destination");
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index 1e57de3265..db03d30605 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -32,7 +32,7 @@ class ToyClient extends SessionDelegate
@Override public void messageReject(Session ssn, MessageReject reject)
{
- for (Range<Long> range : reject.getTransfers())
+ for (Range range : reject.getTransfers())
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
@@ -40,6 +40,7 @@ class ToyClient extends SessionDelegate
ssn.getCommand((int) l));
}
}
+ ssn.processed(reject);
}
public void headers(Session ssn, Struct ... headers)
@@ -73,6 +74,7 @@ class ToyClient extends SessionDelegate
ssn.sessionOpen(1234);
ssn.queueDeclare("asdf", null, null);
+ ssn.sync();
ssn.messageTransfer("asdf", (short) 0, (short) 1);
ssn.headers(new DeliveryProperties(),
diff --git a/specs/amqp.0-10-preview.xml b/specs/amqp.0-10-preview.xml
index 4e2d417c47..8b3c4e49ea 100644
--- a/specs/amqp.0-10-preview.xml
+++ b/specs/amqp.0-10-preview.xml
@@ -1272,6 +1272,22 @@
</doc>
</domain>
+ <domain name="execution-header">
+ <doc>
+ The execution header appears on commands after the class and method id, but prior to method
+ arguments.
+ </doc>
+ <struct size="octet" pack="octet">
+ <field name="sync" domain="bit"
+ label="request notification of completion for a specific command">
+ <doc>
+ Indicates that an execution.complete should be sent immediately after processing the
+ command.
+ </doc>
+ </field>
+ </struct>
+ </domain>
+
<!-- Elementary domains -->
<domain name="bit" type="bit" label="single bit" />
<domain name="octet" type="octet" label="single octet" />
@@ -7063,6 +7079,19 @@
<field name="data" domain="long-struct"/>
</method>
+ <!-- - Method: execution.sync - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="sync" index="50" label="request notification of completion for issued commands">
+ <doc>
+ Requests notification (via execution.complete) when all commands issued prior to the sync
+ control have been processed. If the recipient of this control has already notified the
+ sender that said commands are complete, it may safely ignore the control.
+ </doc>
+
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ </method>
+
</class>
</amqp>