summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-15 15:23:55 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-15 15:23:55 +0000
commit9ffd924daedfc7d1d3c2e072befaf6645aef671e (patch)
tree7321379b681f2025cffe9e8e5c0497c993125b89
parenta22f3f594d6eee7d610fb4f140e18cddd7c880f6 (diff)
downloadqpid-python-9ffd924daedfc7d1d3c2e072befaf6645aef671e.tar.gz
Fixes to get TransactedTest back, there are still unresolved issues with rollback(), however.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507960 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java85
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java4
3 files changed, 52 insertions, 58 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 245e762fa4..00243f865b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -639,50 +639,47 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
- {
- throw new Error("XXX");
-// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
-//
-// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-// {
-// public boolean callback(UnacknowledgedMessage message) throws AMQException
-// {
-// long deliveryTag = message.deliveryTag;
-// AMQShortString consumerTag = message.consumerTag;
-// AMQMessage msg = message.message;
-// msg.setRedelivered(true);
-// // working
-// // deliver(msg, consumerTag, deliveryTag);
-// // trunk
-// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
-// {
-// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
-// }
-// else
-// {
-// // Message has no consumer tag, so was "delivered" to a GET
-// // or consumer no longer registered
-// // cannot resend, so re-queue.
-// if (message.queue != null && (consumerTag == null || requeue))
-// {
-// msgToRequeue.add(message);
-// }
-// }
-// // false means continue processing
-// return false;
-// }
-//
-// public void visitComplete()
-// {
-// }
-// });
-//
-// for(UnacknowledgedMessage message : msgToRequeue)
-// {
-// _txnContext.deliver(message.message, message.queue);
-// _unacknowledgedMessageMap.remove(message.deliveryTag);
-// }
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ {
+ final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ long deliveryTag = message.deliveryTag;
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ deliver(msg, consumerTag, deliveryTag);
+ //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null || requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index ce6df83baf..456763bba0 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -59,7 +59,7 @@ public class TransactedTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
- queue2 = new AMQQueue("Q2", false);
+ queue2 = new AMQQueue("Q2x", false);
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
session = con.createSession(true, 0);
@@ -74,15 +74,6 @@ public class TransactedTest extends TestCase
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
-
-// //add some messages
-// prepProducer1.send(prepSession.createTextMessage("A"));
-// prepProducer1.send(prepSession.createTextMessage("B"));
-// prepProducer1.send(prepSession.createTextMessage("C"));
-//
-// testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
-// testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
-// testConsumer2 = testSession.createConsumer(queue2);
}
protected void tearDown() throws Exception
@@ -110,8 +101,9 @@ public class TransactedTest extends TestCase
//commit
session.commit();
- testCon.start();
+
//ensure sent messages can be received and received messages are gone
+
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
@@ -156,10 +148,10 @@ public class TransactedTest extends TestCase
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
- testCon.start();
- testConsumer1 = testSession.createConsumer(queue1);
+
//commit
- session.commit();
+ //session.commit();
+
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -175,6 +167,7 @@ public class TransactedTest extends TestCase
// messages left over from the last test (which can affect later tests)...
public void testEmpty2() throws Exception
{
+//System.out.println("=== DEBUG === testEmpty2(): assertTrue(null == consumer1.receive(1000));");
assertTrue(null == consumer1.receive(1000));
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 61bc9090a1..26d13b403c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -832,10 +832,14 @@ public class FieldTable
if(_encodedForm != null)
{
+ // FIXME: This is a quick fix for a problem where the ByteBuffer _encodedForm
+ // becomes consumed if debug messages are printed which involve a FieldTable,
+ // and for some tests. This is a rather ugly quick-fix...
if (_encodedForm.remaining() == 0)
{
_encodedForm.rewind();
}
+// _encodedForm.flip();
buffer.put(_encodedForm);
}
else if(_properties != null)