diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 23:14:31 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 23:14:31 +0000 |
commit | 3cf863f0367964e05b75ce790488a237381fcddf (patch) | |
tree | be417ca117b638209e257f98f850f10428346813 | |
parent | 31edffa7eebca19569716e4d08857109ea6a3a02 (diff) | |
download | qpid-python-3cf863f0367964e05b75ce790488a237381fcddf.tar.gz |
Updated to fix sync issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@796061 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 15 insertions, 5 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 3f88578084..a178cba78b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -50,11 +50,23 @@ public class ServerSessionDelegate extends SessionDelegate } @Override + public void command(Session session, Method method) + { + super.command(session, method); + if (method.isSync()) + { + session.flushProcessed(); + } + } + + @Override public void messageAccept(Session session, MessageAccept method) { super.messageAccept(session, method); } + + @Override public void messageReject(Session session, MessageReject method) { @@ -131,9 +143,6 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession) ssn).enqueue(message, queues); - - System.out.println(queues); - ssn.processed(xfr); } catch (AMQException e) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 611c742fb1..e1e1f846cb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -76,7 +76,7 @@ public abstract class Method extends Struct implements ProtocolEvent return sync; } - final void setSync(boolean value) + public final void setSync(boolean value) { this.sync = value; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 33d552b91e..357caa26e1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -186,8 +186,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate case COMMAND: int commandType = dec.readUint16(); // read in the session header, right now we don't use it - dec.readUint16(); + int hdr = dec.readUint16(); command = Method.create(commandType); + command.setSync((0x0001 & hdr) != 0); command.read(dec); if (command.hasPayload()) { |