diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-02-17 14:50:28 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-02-17 14:50:28 +0000 |
| commit | ba8c1f7353e632a3b6ee392736c77b2553fe6b86 (patch) | |
| tree | bad09cca62a9a9653084e2de8a471e542344752e | |
| parent | a6346f88fda91f1df09417e0306d0f5370dbc9bf (diff) | |
| download | qpid-python-ba8c1f7353e632a3b6ee392736c77b2553fe6b86.tar.gz | |
QPID-3008: Fix failover behaviour in 0-10 for QueueBrowserAutoAckTest
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1071631 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 12 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4b4417b6ef..b0bd8f8e97 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); - _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) { - ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; s.resubscribe(); } } - public void closeConnection(long timeout) throws JMSException, AMQException { try @@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec ConnectionClose close = exc.getClose(); if (close == null) { + _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); + try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { _conn.failoverPrep(); - _qpidConnection.resume(); + _conn.resubscribeSessions(); _conn.fireFailoverComplete(); return; } @@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _logger.error("error during failover", e); } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); + } } ExceptionListener listener = _conn._exceptionListener; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 6fa22b7971..517a7a5ce8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void resumed(Session ssn) { _qpidConnection = ssn.getConnection(); - try - { - resubscribe(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } } public void message(Session ssn, MessageTransfer xfr) diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index f54b022c09..97d825177c 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.test.client; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.FailoverBaseCase; @@ -41,8 +41,6 @@ import java.util.Random; public class QueueBrowserAutoAckTest extends FailoverBaseCase { - private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class); - protected Connection _clientConnection; protected Session _clientSession; protected Queue _queue; @@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase { super.setUp(); - //Create Client _clientConnection = getConnection(); - _clientConnection.start(); setupSession(); @@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase closeBrowserBeforeAfterGetNext(10); validate(messages); - } /** @@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase { int messages = 5; - sendMessages("connection1", messages); if (!CLUSTERED) { sendMessages("connection2", messages); } - checkQueueDepth(messages); - _logger.info("Creating Queue Browser"); - QueueBrowser queueBrowser = _clientSession.createBrowser(_queue); long queueDepth = 0; @@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase } catch (AMQException e) { + fail("Caught exception getting queue depth: " + e.getMessage()); } assertEquals("Session reports Queue depth not as expected", messages, queueDepth); - int msgCount = 0; - int failPoint = 0; failPoint = new Random().nextInt(messages) + 1; Enumeration msgs = queueBrowser.getEnumeration(); - while (msgs.hasMoreElements()) { msgs.nextElement(); @@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase //Validate all messages still on Broker 1 validate(messages); } - } |
