summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-05 11:10:28 +0000
committerAidan Skinner <aidan@apache.org>2008-08-05 11:10:28 +0000
commit5a245b5b3b7e787a2163bd782d67e94d255a9957 (patch)
tree0daba5307f5bcfb51ca5153afbd43a84de9d9c34
parent6f0c723706e09a9333288c139045276537aa5dc6 (diff)
downloadqpid-python-5a245b5b3b7e787a2163bd782d67e94d255a9957.tar.gz
QPID-1206: Fix failover and failover tests
AMQConnection: remove dead and confusingly misnamed method AMQSession: rename failedOver to failedOverDirty to convey actual usage, only set it if we failed over while dirty. Ewww! BasicMessageConsumer: if we're in client ack mode, mark as dirty when we receive a message PingPongProducer: calculate expected replies properly if we fail after a send or before a commit FailoverTest: test transacted case git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682672 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java17
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java34
5 files changed, 39 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 90dce8eb60..27294562e5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1322,24 +1322,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_sessions.remove(channelId);
}
- /**
- * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
- * The caller must hold the failover mutex before calling this method.
- */
- public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
- {
- ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
- for (Iterator it = sessions.iterator(); it.hasNext();)
- {
- AMQSession s = (AMQSession) it.next();
- // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
- s.resubscribe();
- s.setFlowControl(true);
- }
- }
-
public String toString()
{
StringBuffer buf = new StringBuffer("AMQConnection:\n");
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 801cf53d83..c253b8b641 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -368,8 +368,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
- /** Has failover occured on this session */
- private boolean _failedOver;
+ /** Has failover occured on this session with outstanding actions to commit? */
+ private boolean _failedOverDirty;
private static final class FlowControlIndicator
{
@@ -740,6 +740,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
// Commits outstanding messages and acknowledgments
sendCommit();
+ markClean();
}
catch (AMQException e)
{
@@ -1796,7 +1797,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
void resubscribe() throws AMQException
{
- _failedOver = true;
+ if (_dirty)
+ {
+ _failedOverDirty = true;
+ }
resubscribeProducers();
resubscribeConsumers();
}
@@ -2586,7 +2590,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public void markClean()
{
_dirty = false;
- _failedOver = false;
+ _failedOverDirty = false;
}
/**
@@ -2596,7 +2600,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
public boolean hasFailedOver()
{
- return _failedOver;
+ return _failedOverDirty;
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9acc69ec55..aaf56e8493 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -730,18 +730,9 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
{
if (isMessageListenerSet())
{
- // we do not need a lock around the test above, and the dispatch below as it is invalid
- // for an application to alter an installed listener while the session is started
- // synchronized (_closed)
- {
- // if (!_closed.get())
- {
-
- preApplicationProcessing(jmsMessage);
- getMessageListener().onMessage(jmsMessage);
- postDeliver(jmsMessage);
- }
- }
+ preApplicationProcessing(jmsMessage);
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
}
else
{
@@ -802,7 +793,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
-
+ _session.markDirty();
break;
case Session.DUPS_OK_ACKNOWLEDGE:
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 4d8a736ec8..f994cd138e 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -1673,11 +1673,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
*/
public int getExpectedNumPings(int numpings)
{
- // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
-
- // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
-
- return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
+ // Wow, I'm freaking sorry about this return here...
+ return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) *
+ (_isPubSub ? getConsumersPerDestination() : 1);
}
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index cab6a3a736..bf87e8e84f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -98,7 +98,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
super.tearDown();
}
- private void consumeMessages(int toConsume) throws JMSException
+ private void consumeMessages(int toConsume, boolean transacted) throws JMSException
{
Message msg;
for (int i = 0; i < toConsume; i++)
@@ -107,31 +107,43 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
assertNotNull("Message " + i + " was null!", msg);
assertEquals("message " + i, ((TextMessage) msg).getText());
}
+ if (transacted) {
+ consumerSession.commit();
+ }
}
- private void sendMessages(int totalMessages) throws JMSException
+ private void sendMessages(int totalMessages, boolean transacted) throws JMSException
{
for (int i = 0; i < totalMessages; i++)
{
producer.send(producerSession.createTextMessage("message " + i));
}
+ if (transacted)
+ {
+ producerSession.commit();
+ }
}
public void testP2PFailover() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true);
+ testP2PFailover(NUM_MESSAGES, true, false);
}
public void testP2PFailoverWithMessagesLeft() throws Exception
{
- testP2PFailover(NUM_MESSAGES, false);
+ testP2PFailover(NUM_MESSAGES, false, false);
+ }
+
+ public void testP2PFailoverTransacted() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, true, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
{
Message msg = null;
- init(false, Session.AUTO_ACKNOWLEDGE);
- sendMessages(totalMessages);
+ init(transacted, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(totalMessages, transacted);
// Consume some messages
int toConsume = totalMessages;
@@ -140,7 +152,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
toConsume = totalMessages / 2;
}
- consumeMessages(toConsume);
+ consumeMessages(toConsume, transacted);
_logger.info("Failing over");
@@ -150,8 +162,8 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
assertNull("Should not have received message from new broker!", msg);
// Check that messages still sent / received
- sendMessages(totalMessages);
- consumeMessages(totalMessages);
+ sendMessages(totalMessages, transacted);
+ consumeMessages(totalMessages, transacted);
}
private void causeFailure(long delay)
@@ -173,7 +185,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(1);
+ sendMessages(1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);