diff options
author | Aidan Skinner <aidan@apache.org> | 2008-08-05 11:10:28 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-08-05 11:10:28 +0000 |
commit | 5a245b5b3b7e787a2163bd782d67e94d255a9957 (patch) | |
tree | 0daba5307f5bcfb51ca5153afbd43a84de9d9c34 | |
parent | 6f0c723706e09a9333288c139045276537aa5dc6 (diff) | |
download | qpid-python-5a245b5b3b7e787a2163bd782d67e94d255a9957.tar.gz |
QPID-1206: Fix failover and failover tests
AMQConnection: remove dead and confusingly misnamed method
AMQSession: rename failedOver to failedOverDirty to convey actual usage, only set it if we failed over while dirty. Ewww!
BasicMessageConsumer: if we're in client ack mode, mark as dirty when we receive a message
PingPongProducer: calculate expected replies properly if we fail after a send or before a commit
FailoverTest: test transacted case
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682672 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 39 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 90dce8eb60..27294562e5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1322,24 +1322,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _sessions.remove(channelId); } - /** - * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. - * The caller must hold the failover mutex before calling this method. - */ - public void resubscribeSesssions() throws JMSException, AMQException, FailoverException - { - ArrayList sessions = new ArrayList(_sessions.values()); - _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? - for (Iterator it = sessions.iterator(); it.hasNext();) - { - AMQSession s = (AMQSession) it.next(); - // _protocolHandler.addSessionByChannel(s.getChannelId(), s); - reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); - s.resubscribe(); - s.setFlowControl(true); - } - } - public String toString() { StringBuffer buf = new StringBuffer("AMQConnection:\n"); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 801cf53d83..c253b8b641 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -368,8 +368,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ private boolean _dirty; - /** Has failover occured on this session */ - private boolean _failedOver; + /** Has failover occured on this session with outstanding actions to commit? */ + private boolean _failedOverDirty; private static final class FlowControlIndicator { @@ -740,6 +740,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // Commits outstanding messages and acknowledgments sendCommit(); + markClean(); } catch (AMQException e) { @@ -1796,7 +1797,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ void resubscribe() throws AMQException { - _failedOver = true; + if (_dirty) + { + _failedOverDirty = true; + } resubscribeProducers(); resubscribeConsumers(); } @@ -2586,7 +2590,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void markClean() { _dirty = false; - _failedOver = false; + _failedOverDirty = false; } /** @@ -2596,7 +2600,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ public boolean hasFailedOver() { - return _failedOver; + return _failedOverDirty; } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9acc69ec55..aaf56e8493 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -730,18 +730,9 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me { if (isMessageListenerSet()) { - // we do not need a lock around the test above, and the dispatch below as it is invalid - // for an application to alter an installed listener while the session is started - // synchronized (_closed) - { - // if (!_closed.get()) - { - - preApplicationProcessing(jmsMessage); - getMessageListener().onMessage(jmsMessage); - postDeliver(jmsMessage); - } - } + preApplicationProcessing(jmsMessage); + getMessageListener().onMessage(jmsMessage); + postDeliver(jmsMessage); } else { @@ -802,7 +793,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me { _session.acknowledgeMessage(msg.getDeliveryTag(), false); } - + _session.markDirty(); break; case Session.DUPS_OK_ACKNOWLEDGE: diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 4d8a736ec8..f994cd138e 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -1673,11 +1673,9 @@ public class PingPongProducer implements Runnable, ExceptionListener */ public int getExpectedNumPings(int numpings) { - // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); - - // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); - - return numpings * (_isPubSub ? getConsumersPerDestination() : 1); + // Wow, I'm freaking sorry about this return here... + return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) * + (_isPubSub ? getConsumersPerDestination() : 1); } /** diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index cab6a3a736..bf87e8e84f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -98,7 +98,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener super.tearDown(); } - private void consumeMessages(int toConsume) throws JMSException + private void consumeMessages(int toConsume, boolean transacted) throws JMSException { Message msg; for (int i = 0; i < toConsume; i++) @@ -107,31 +107,43 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener assertNotNull("Message " + i + " was null!", msg); assertEquals("message " + i, ((TextMessage) msg).getText()); } + if (transacted) { + consumerSession.commit(); + } } - private void sendMessages(int totalMessages) throws JMSException + private void sendMessages(int totalMessages, boolean transacted) throws JMSException { for (int i = 0; i < totalMessages; i++) { producer.send(producerSession.createTextMessage("message " + i)); } + if (transacted) + { + producerSession.commit(); + } } public void testP2PFailover() throws Exception { - testP2PFailover(NUM_MESSAGES, true); + testP2PFailover(NUM_MESSAGES, true, false); } public void testP2PFailoverWithMessagesLeft() throws Exception { - testP2PFailover(NUM_MESSAGES, false); + testP2PFailover(NUM_MESSAGES, false, false); + } + + public void testP2PFailoverTransacted() throws Exception + { + testP2PFailover(NUM_MESSAGES, true, false); } - private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException + private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException { Message msg = null; - init(false, Session.AUTO_ACKNOWLEDGE); - sendMessages(totalMessages); + init(transacted, Session.AUTO_ACKNOWLEDGE); + sendMessages(totalMessages, transacted); // Consume some messages int toConsume = totalMessages; @@ -140,7 +152,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener toConsume = totalMessages / 2; } - consumeMessages(toConsume); + consumeMessages(toConsume, transacted); _logger.info("Failing over"); @@ -150,8 +162,8 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener assertNull("Should not have received message from new broker!", msg); // Check that messages still sent / received - sendMessages(totalMessages); - consumeMessages(totalMessages); + sendMessages(totalMessages, transacted); + consumeMessages(totalMessages, transacted); } private void causeFailure(long delay) @@ -173,7 +185,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener public void testClientAckFailover() throws Exception { init(false, Session.CLIENT_ACKNOWLEDGE); - sendMessages(1); + sendMessages(1, false); Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); |