summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-07-20 23:14:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-07-20 23:14:31 +0000
commit3cf863f0367964e05b75ce790488a237381fcddf (patch)
treebe417ca117b638209e257f98f850f10428346813
parent31edffa7eebca19569716e4d08857109ea6a3a02 (diff)
downloadqpid-python-3cf863f0367964e05b75ce790488a237381fcddf.tar.gz
Updated to fix sync issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@796061 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java3
3 files changed, 15 insertions, 5 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 3f88578084..a178cba78b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -50,11 +50,23 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
+ public void command(Session session, Method method)
+ {
+ super.command(session, method);
+ if (method.isSync())
+ {
+ session.flushProcessed();
+ }
+ }
+
+ @Override
public void messageAccept(Session session, MessageAccept method)
{
super.messageAccept(session, method);
}
+
+
@Override
public void messageReject(Session session, MessageReject method)
{
@@ -131,9 +143,6 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession) ssn).enqueue(message, queues);
-
- System.out.println(queues);
-
ssn.processed(xfr);
}
catch (AMQException e)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 611c742fb1..e1e1f846cb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -76,7 +76,7 @@ public abstract class Method extends Struct implements ProtocolEvent
return sync;
}
- final void setSync(boolean value)
+ public final void setSync(boolean value)
{
this.sync = value;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 33d552b91e..357caa26e1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -186,8 +186,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
case COMMAND:
int commandType = dec.readUint16();
// read in the session header, right now we don't use it
- dec.readUint16();
+ int hdr = dec.readUint16();
command = Method.create(commandType);
+ command.setSync((0x0001 & hdr) != 0);
command.read(dec);
if (command.hasPayload())
{