diff options
author | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 15:23:55 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 15:23:55 +0000 |
commit | 9ffd924daedfc7d1d3c2e072befaf6645aef671e (patch) | |
tree | 7321379b681f2025cffe9e8e5c0497c993125b89 | |
parent | a22f3f594d6eee7d610fb4f140e18cddd7c880f6 (diff) | |
download | qpid-python-9ffd924daedfc7d1d3c2e072befaf6645aef671e.tar.gz |
Fixes to get TransactedTest back, there are still unresolved issues with rollback(), however.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507960 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 52 insertions, 58 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 245e762fa4..00243f865b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -639,50 +639,47 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException - { - throw new Error("XXX"); -// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); -// -// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() -// { -// public boolean callback(UnacknowledgedMessage message) throws AMQException -// { -// long deliveryTag = message.deliveryTag; -// AMQShortString consumerTag = message.consumerTag; -// AMQMessage msg = message.message; -// msg.setRedelivered(true); -// // working -// // deliver(msg, consumerTag, deliveryTag); -// // trunk -// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) -// { -// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); -// } -// else -// { -// // Message has no consumer tag, so was "delivered" to a GET -// // or consumer no longer registered -// // cannot resend, so re-queue. -// if (message.queue != null && (consumerTag == null || requeue)) -// { -// msgToRequeue.add(message); -// } -// } -// // false means continue processing -// return false; -// } -// -// public void visitComplete() -// { -// } -// }); -// -// for(UnacknowledgedMessage message : msgToRequeue) -// { -// _txnContext.deliver(message.message, message.queue); -// _unacknowledgedMessageMap.remove(message.deliveryTag); -// } + public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + { + final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); + + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + long deliveryTag = message.deliveryTag; + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + { + deliver(msg, consumerTag, deliveryTag); + //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null && (consumerTag == null || requeue)) + { + msgToRequeue.add(message); + } + } + // false means continue processing + return false; + } + + public void visitComplete() + { + } + }); + + for(UnacknowledgedMessage message : msgToRequeue) + { + _txnContext.deliver(message.message, message.queue); + _unacknowledgedMessageMap.remove(message.deliveryTag); + } } /** diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index ce6df83baf..456763bba0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -59,7 +59,7 @@ public class TransactedTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); - queue2 = new AMQQueue("Q2", false); + queue2 = new AMQQueue("Q2x", false); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); session = con.createSession(true, 0); @@ -74,15 +74,6 @@ public class TransactedTest extends TestCase prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); - -// //add some messages -// prepProducer1.send(prepSession.createTextMessage("A")); -// prepProducer1.send(prepSession.createTextMessage("B")); -// prepProducer1.send(prepSession.createTextMessage("C")); -// -// testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); -// testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); -// testConsumer2 = testSession.createConsumer(queue2); } protected void tearDown() throws Exception @@ -110,8 +101,9 @@ public class TransactedTest extends TestCase //commit session.commit(); - testCon.start(); + //ensure sent messages can be received and received messages are gone + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer1 = testSession.createConsumer(queue1); @@ -156,10 +148,10 @@ public class TransactedTest extends TestCase expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); - testCon.start(); - testConsumer1 = testSession.createConsumer(queue1); + //commit - session.commit(); + //session.commit(); + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -175,6 +167,7 @@ public class TransactedTest extends TestCase // messages left over from the last test (which can affect later tests)... public void testEmpty2() throws Exception { +//System.out.println("=== DEBUG === testEmpty2(): assertTrue(null == consumer1.receive(1000));"); assertTrue(null == consumer1.receive(1000)); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 61bc9090a1..26d13b403c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -832,10 +832,14 @@ public class FieldTable if(_encodedForm != null) { + // FIXME: This is a quick fix for a problem where the ByteBuffer _encodedForm + // becomes consumed if debug messages are printed which involve a FieldTable, + // and for some tests. This is a rather ugly quick-fix... if (_encodedForm.remaining() == 0) { _encodedForm.rewind(); } +// _encodedForm.flip(); buffer.put(_encodedForm); } else if(_properties != null) |