summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-02-17 14:50:28 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-02-17 14:50:28 +0000
commitba8c1f7353e632a3b6ee392736c77b2553fe6b86 (patch)
treebad09cca62a9a9653084e2de8a471e542344752e
parenta6346f88fda91f1df09417e0306d0f5370dbc9bf (diff)
downloadqpid-python-ba8c1f7353e632a3b6ee392736c77b2553fe6b86.tar.gz
QPID-3008: Fix failover behaviour in 0-10 for QueueBrowserAutoAckTest
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1071631 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java16
3 files changed, 12 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 4b4417b6ef..b0bd8f8e97 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)
{
- ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
s.resubscribe();
}
}
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
try
@@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
ConnectionClose close = exc.getClose();
if (close == null)
{
+ _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
_conn.failoverPrep();
- _qpidConnection.resume();
+ _conn.resubscribeSessions();
_conn.fireFailoverComplete();
return;
}
@@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_logger.error("error during failover", e);
}
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
}
ExceptionListener listener = _conn._exceptionListener;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 6fa22b7971..517a7a5ce8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void resumed(Session ssn)
{
_qpidConnection = ssn.getConnection();
- try
- {
- resubscribe();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
}
public void message(Session ssn, MessageTransfer xfr)
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index f54b022c09..97d825177c 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.test.client;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -41,8 +41,6 @@ import java.util.Random;
public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
- private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class);
-
protected Connection _clientConnection;
protected Session _clientSession;
protected Queue _queue;
@@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
super.setUp();
-
//Create Client
_clientConnection = getConnection();
-
_clientConnection.start();
setupSession();
@@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
closeBrowserBeforeAfterGetNext(10);
validate(messages);
-
}
/**
@@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
int messages = 5;
-
sendMessages("connection1", messages);
if (!CLUSTERED)
{
sendMessages("connection2", messages);
}
-
checkQueueDepth(messages);
-
_logger.info("Creating Queue Browser");
-
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
long queueDepth = 0;
@@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
}
catch (AMQException e)
{
+ fail("Caught exception getting queue depth: " + e.getMessage());
}
assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
-
int msgCount = 0;
-
int failPoint = 0;
failPoint = new Random().nextInt(messages) + 1;
Enumeration msgs = queueBrowser.getEnumeration();
-
while (msgs.hasMoreElements())
{
msgs.nextElement();
@@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
//Validate all messages still on Broker 1
validate(messages);
}
-
}