summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
commit939ccc98d866a00c3246880a6206d61e38debf1b (patch)
treeee2a1c4467ec6eea714e27e1131c70b795d7ef05
parent9a420ca9f7d542927503d044abefaa5004576596 (diff)
downloadqpid-python-939ccc98d866a00c3246880a6206d61e38debf1b.tar.gz
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover testing. - Modified QpidTestCase to substitute port variables into broker start/stop commands. - Modified test profiles to use the new port variables. - Modified QpidTestCase to permit multiple exclude files. - Modified test profiles to make use of a common exclude list: ExcludeList - Added ConnectionTest.testResumeEmptyReplayBuffer. - Made default exception handling for Connection and Session log the exception. - Added SenderExcetion to specifically signal problems with transmitting connection data. - Modified Session to catch and deal with connection send failures for sessions with positive expiry. - Modified FailoverBaseCase to work for non VM brokers. - Made FailoverTest fail if failover times out. - Modified JMS implementation to make use of the recently added low level session resume. - Unexcluded failover tests from 0-10 test profiles. - Excluded MultipleJCAProviderRegistrationTest due to its testing strategy resulting in spurious failure when running as part of the larger test suite. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@708093 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/010ExcludeList16
-rw-r--r--qpid/java/ExcludeList1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java80
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java37
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java33
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java22
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java52
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java72
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java13
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java111
-rw-r--r--qpid/java/cpp.async.testprofile4
-rw-r--r--qpid/java/cpp.noprefetch.testprofile4
-rw-r--r--qpid/java/cpp.testprofile4
-rw-r--r--qpid/java/default.testprofile2
-rw-r--r--qpid/java/java.testprofile4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java50
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java133
-rw-r--r--qpid/java/test-provider.properties14
24 files changed, 473 insertions, 224 deletions
diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList
index 7810d42211..7512329220 100644
--- a/qpid/java/010ExcludeList
+++ b/qpid/java/010ExcludeList
@@ -19,21 +19,7 @@ org.apache.qpid.server.security.acl.SimpleACLTest#*
org.apache.qpid.server.plugins.PluginTest#*
// This test is not finished
org.apache.qpid.test.testcases.TTLTest#*
-// Those tests require failover support
-org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
-org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
-org.apache.qpid.test.testcases.FailoverTest#*
-org.apache.qpid.test.client.failover.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover
// Those tests are testing 0.8 specific semantics
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
diff --git a/qpid/java/ExcludeList b/qpid/java/ExcludeList
new file mode 100644
index 0000000000..5b5b73e8fa
--- /dev/null
+++ b/qpid/java/ExcludeList
@@ -0,0 +1 @@
+org.apache.qpid.client.MultipleJCAProviderRegistrationTest#test
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0999a09ca5..4e8fdc2370 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -628,6 +629,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _delegate.makeBrokerConnection(brokerDetail);
}
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ return _delegate.executeRetrySupport(operation);
+ }
+
/**
* Get the details of the currently active broker
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 60b827a426..b64147fe8f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -27,20 +27,24 @@ import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
- public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+ ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
- public Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException;
+ Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException;
- public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
+ XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
- public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+ void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+
+ void closeConnection(long timeout) throws JMSException, AMQException;
+
+ <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
- public void closeConnection(long timeout) throws JMSException, AMQException;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 6480a0da76..8a9abcc398 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -23,6 +23,9 @@ package org.apache.qpid.client;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
@@ -31,6 +34,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
@@ -61,11 +65,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
* The QpidConeection instance that is mapped with thie JMS connection.
*/
org.apache.qpid.transport.Connection _qpidConnection;
+ private ConnectionException exception = null;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
{
_conn = conn;
+ _qpidConnection = new Connection();
+ _qpidConnection.setConnectionListener(this);
}
/**
@@ -129,16 +136,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
*/
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _qpidConnection = new Connection();
try
{
if (_logger.isDebugEnabled())
{
- _logger.debug("creating connection with broker " + " host: " + brokerDetail
- .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn
- .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost() +
+ " port: " + brokerDetail.getPort() +
+ " vhost: " + _conn.getVirtualHost() +
+ " username: " + _conn.getUsername() +
+ " password: " + _conn.getPassword());
}
- _qpidConnection.setConnectionListener(this);
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
_conn._connected = true;
@@ -160,8 +167,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
*/
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
- //NOT implemented as railover is handled at a lower level
- throw new FailoverException("failing to reconnect during failover, operation not supported.");
+ List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ for (AMQSession s : sessions)
+ {
+ ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
+ s.resubscribe();
+ }
}
@@ -181,6 +193,43 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void exception(Connection conn, ConnectionException exc)
{
+ if (exception != null)
+ {
+ _logger.error("previous exception", exception);
+ }
+
+ exception = exc;
+ }
+
+ public void closed(Connection conn)
+ {
+ ConnectionException exc = exception;
+ exception = null;
+
+ ConnectionClose close = exc.getClose();
+ if (close == null)
+ {
+ try
+ {
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _qpidConnection.resume();
+
+ if (_conn.firePreResubscribe())
+ {
+ _conn.resubscribeSessions();
+ }
+
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ }
+
ExceptionListener listener = _conn._exceptionListener;
if (listener == null)
{
@@ -188,19 +237,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
else
{
- ConnectionClose close = exc.getClose();
String code = null;
if (close != null)
{
code = close.getReplyCode().toString();
}
+
JMSException ex = new JMSException(exc.getMessage(), code);
ex.initCause(exc);
-
- _conn._exceptionListener.onException(ex);
+ listener.onException(ex);
}
}
- public void closed(Connection conn) {}
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 8d42a2f201..035e3830ca 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -247,4 +247,41 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
}
}
+
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ while (true)
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (_conn.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _logger.debug("Failover exception caught during operation: " + e, e);
+ }
+ catch (IllegalStateException e)
+ {
+ if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index cbdefd0548..b5d12d9520 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -261,6 +261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -1809,6 +1810,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_failedOverDirty = true;
}
+
+ _rollbackMark.set(-1);
resubscribeProducers();
resubscribeConsumers();
}
@@ -2601,7 +2604,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Used for debugging in the dispatcher. */
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
@@ -2611,7 +2613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
- private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 7829966315..ab983aa842 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -113,11 +113,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- // create the qpid session with an expiry <= 0 so that the session does not expire
- _qpidSession = qpidConnection.createSession(0);
- // set the exception listnere for this session
+ _qpidSession = _qpidConnection.createSession(1);
_qpidSession.setSessionListener(this);
- // set transacted if required
if (_transacted)
{
_qpidSession.txSelect();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index cf7e978c03..e9e52cc97c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -99,37 +99,6 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup
*/
public T execute() throws E
{
- while (true)
- {
- try
- {
- connection.blockUntilNotFailingOver();
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted: " + e, e);
-
- return null;
- }
-
- synchronized (connection.getFailoverMutex())
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- _log.debug("Failover exception caught during operation: " + e, e);
- }
- catch (IllegalStateException e)
- {
- if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
- {
- throw e;
- }
- }
- }
- }
+ return connection.executeRetrySupport(operation);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7a66c2c238..cf9b9145a9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -62,7 +62,7 @@ public class Connection extends ConnectionInvoker
public void opened(Connection conn) {}
public void exception(Connection conn, ConnectionException exception)
{
- throw exception;
+ log.error(exception, "connection exception");
}
public void closed(Connection conn) {}
}
@@ -155,7 +155,12 @@ public class Connection extends ConnectionInvoker
return saslClient;
}
- public void connect(String host, int port, String vhost, String username, String password,boolean ssl)
+ public void connect(String host, int port, String vhost, String username, String password)
+ {
+ connect(host, port, vhost, username, password, false);
+ }
+
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
{
synchronized (lock)
{
@@ -163,7 +168,7 @@ public class Connection extends ConnectionInvoker
delegate = new ClientDelegate(vhost, username, password);
- IoTransport.connect(host, port, ConnectionBinding.get(this),ssl);
+ IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -371,12 +376,11 @@ public class Connection extends ConnectionInvoker
case CLOSING:
error = e;
lock.notifyAll();
- break;
- default:
- listener.exception(this, e);
- break;
+ return;
}
}
+
+ listener.exception(this, e);
}
public void exception(Throwable t)
@@ -402,13 +406,13 @@ public class Connection extends ConnectionInvoker
public void closed()
{
- log.debug("connection closed: %s", this);
-
if (state == OPEN)
{
exception(new ConnectionException("connection aborted"));
}
+ log.debug("connection closed: %s", this);
+
synchronized (lock)
{
List<Session> values = new ArrayList<Session>(channels.values());
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java
new file mode 100644
index 0000000000..a96079dc27
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SenderException
+ *
+ */
+
+public class SenderException extends TransportException
+{
+
+ public SenderException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public SenderException(String message)
+ {
+ super(message);
+ }
+
+ public SenderException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public void rethrow()
+ {
+ throw new SenderException(getMessage(), this);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index bab4bb35ee..8877b7b683 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -65,7 +65,7 @@ public class Session extends SessionInvoker
public void exception(Session ssn, SessionException exc)
{
- throw exc;
+ log.error(exc, "session exception");
}
public void closed(Session ssn) {}
@@ -195,6 +195,9 @@ public class Session extends SessionInvoker
send(m);
}
}
+
+ sessionCommandPoint(commandsOut, 0);
+ sessionFlush(COMPLETED);
}
}
@@ -219,6 +222,7 @@ public class Session extends SessionInvoker
this.commandsIn = id;
if (!incomingInit)
{
+ incomingInit = true;
maxProcessed = commandsIn - 1;
syncPoint = maxProcessed;
}
@@ -242,6 +246,11 @@ public class Session extends SessionInvoker
final void identify(Method cmd)
{
+ if (!incomingInit)
+ {
+ throw new IllegalStateException();
+ }
+
int id = nextCommandId();
cmd.setId(id);
@@ -417,8 +426,8 @@ public class Session extends SessionInvoker
default:
throw new SessionException
(String.format
- ("timed out waiting for session to become open %s",
- state));
+ ("timed out waiting for session to become open " +
+ "(state=%s)", state));
}
int next = commandsOut++;
@@ -429,7 +438,26 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && isFull(next))
{
- sessionFlush(COMPLETED);
+ if (state == OPEN)
+ {
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will
+ // happen again on resume
+ log.error(e, "error sending flush (full replay buffer)");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
+ }
w.await();
}
}
@@ -452,7 +480,23 @@ public class Session extends SessionInvoker
m.setSync(true);
}
needSync = !m.isSync();
- send(m);
+ try
+ {
+ send(m);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will happen
+ // again on resume
+ log.error(e, "error sending command");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
if (autoSync)
{
sync();
@@ -462,7 +506,23 @@ public class Session extends SessionInvoker
// wraparound
if ((next % 65536) == 0)
{
- sessionFlush(COMPLETED);
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will happen
+ // again on resume
+ log.error(e, "error sending flush (periodic)");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
}
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 73ff039be5..36ea14856a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -92,7 +93,7 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
{
if (closed.get())
{
- throw new TransportException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
final int size = buffer.length;
@@ -125,12 +126,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
if (closed.get())
{
- throw new TransportException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
if (head - tail >= size)
{
- throw new TransportException(String.format("write timed out: %s, %s", head, tail));
+ throw new SenderException(String.format("write timed out: %s, %s", head, tail));
}
}
continue;
@@ -192,19 +193,19 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
join(timeout);
if (isAlive())
{
- throw new TransportException("join timed out");
+ throw new SenderException("join timed out");
}
}
transport.getReceiver().close(false);
}
catch (InterruptedException e)
{
- throw new TransportException(e);
+ throw new SenderException(e);
}
if (reportException && exception != null)
{
- throw new TransportException(exception);
+ throw new SenderException(exception);
}
}
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index b1fe08bfb9..1da56654f0 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -48,6 +48,7 @@ public class ConnectionTest extends TestCase implements SessionListener
private int port;
private volatile boolean queue = false;
private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
+ private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
protected void setUp() throws Exception
{
@@ -71,7 +72,7 @@ public class ConnectionTest extends TestCase implements SessionListener
public void opened(Session ssn) {}
- public void message(Session ssn, MessageTransfer xfr)
+ public void message(final Session ssn, MessageTransfer xfr)
{
if (queue)
{
@@ -86,6 +87,25 @@ public class ConnectionTest extends TestCase implements SessionListener
{
ssn.getConnection().close();
}
+ else if (body.startsWith("DELAYED_CLOSE"))
+ {
+ ssn.processed(xfr);
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ ssn.getConnection().close();
+ }
+ }.start();
+ }
else if (body.startsWith("ECHO"))
{
int id = xfr.getId();
@@ -139,7 +159,7 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
});
- conn.connect("localhost", port, null, "guest", "guest",false);
+ conn.connect("localhost", port, null, "guest", "guest", false);
return conn;
}
@@ -148,7 +168,7 @@ public class ConnectionTest extends TestCase implements SessionListener
Condition closed = new Condition();
Connection conn = connect(closed);
- Session ssn = conn.createSession();
+ Session ssn = conn.createSession(1);
send(ssn, "CLOSE");
if (!closed.get(3000))
@@ -167,44 +187,47 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
- public void testResume() throws Exception
+ class FailoverConnectionListener implements ConnectionListener
{
- Connection conn = new Connection();
- conn.connect("localhost", port, null, "guest", "guest",false);
+ public void opened(Connection conn) {}
- conn.setConnectionListener(new ConnectionListener()
+ public void exception(Connection conn, ConnectionException e)
{
- public void opened(Connection conn) {}
- public void exception(Connection conn, ConnectionException e)
- {
- throw e;
- }
- public void closed(Connection conn)
- {
- queue = true;
- conn.connect("localhost", port, null, "guest", "guest",false);
- conn.resume();
- }
- });
+ throw e;
+ }
- Session ssn = conn.createSession(1);
- final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
- ssn.setSessionListener(new SessionListener()
+ public void closed(Connection conn)
{
- public void opened(Session s) {}
- public void exception(Session s, SessionException e) {}
- public void message(Session s, MessageTransfer xfr)
- {
- synchronized (incoming)
- {
- incoming.add(xfr);
- incoming.notifyAll();
- }
+ queue = true;
+ conn.connect("localhost", port, null, "guest", "guest");
+ conn.resume();
+ }
+ }
- s.processed(xfr);
+ class TestSessionListener implements SessionListener
+ {
+ public void opened(Session s) {}
+ public void exception(Session s, SessionException e) {}
+ public void message(Session s, MessageTransfer xfr)
+ {
+ synchronized (incoming)
+ {
+ incoming.add(xfr);
+ incoming.notifyAll();
}
- public void closed(Session s) {}
- });
+
+ s.processed(xfr);
+ }
+ public void closed(Session s) {}
+ }
+
+ public void testResumeNonemptyReplayBuffer() throws Exception
+ {
+ Connection conn = new Connection();
+ conn.setConnectionListener(new FailoverConnectionListener());
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession(1);
+ ssn.setSessionListener(new TestSessionListener());
send(ssn, "SINK 0");
send(ssn, "ECHO 1");
@@ -251,4 +274,24 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
+ public void testResumeEmptyReplayBuffer() throws InterruptedException
+ {
+ Connection conn = new Connection();
+ conn.setConnectionListener(new FailoverConnectionListener());
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession(1);
+ ssn.setSessionListener(new TestSessionListener());
+
+ send(ssn, "SINK 0");
+ send(ssn, "SINK 1");
+ send(ssn, "DELAYED_CLOSE 2");
+ ssn.sync();
+ Thread.sleep(6000);
+ send(ssn, "SINK 3");
+ ssn.sync();
+ System.out.println(messages);
+ assertEquals(1, messages.size());
+ assertEquals("SINK 3", messages.get(0).getBodyString());
+ }
+
}
diff --git a/qpid/java/cpp.async.testprofile b/qpid/java/cpp.async.testprofile
index c7554165f0..746ef2c0ef 100644
--- a/qpid/java/cpp.async.testprofile
+++ b/qpid/java/cpp.async.testprofile
@@ -1,3 +1,3 @@
broker.version=0-10
-broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --auth no
-test.excludesfile=${project.root}/010ExcludeList-store
+broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/msgstore.so --auth no
+test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList-store
diff --git a/qpid/java/cpp.noprefetch.testprofile b/qpid/java/cpp.noprefetch.testprofile
index 760f0d147f..e502f3c950 100644
--- a/qpid/java/cpp.noprefetch.testprofile
+++ b/qpid/java/cpp.noprefetch.testprofile
@@ -1,4 +1,4 @@
broker.version=0-10
-broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --auth no
-test.excludesfile=${project.root}/010ExcludeList-noPrefetch
+broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/msgstore.so --auth no
+test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList-noPrefetch
max_prefetch=0
diff --git a/qpid/java/cpp.testprofile b/qpid/java/cpp.testprofile
index 68ac8b8be8..af2d0f58e4 100644
--- a/qpid/java/cpp.testprofile
+++ b/qpid/java/cpp.testprofile
@@ -1,3 +1,3 @@
broker.version=0-10
-broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --auth no
-test.excludesfile=${project.root}/010ExcludeList
+broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --auth no
+test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList
diff --git a/qpid/java/default.testprofile b/qpid/java/default.testprofile
index 7354cbda48..46c3f7746a 100644
--- a/qpid/java/default.testprofile
+++ b/qpid/java/default.testprofile
@@ -14,7 +14,7 @@ log4j.configuration=file:///${project.root}/log4j-test.xml
log4j.debug=false
test.excludes=true
-test.excludesfile=${project.root}/08ExcludeList
+test.excludesfile=${project.root}/ExcludeList ${project.root}/08ExcludeList
test.fork=no
test.mem=512M
test=*Test
diff --git a/qpid/java/java.testprofile b/qpid/java/java.testprofile
index cef0a10661..ab98e47e97 100644
--- a/qpid/java/java.testprofile
+++ b/qpid/java/java.testprofile
@@ -1,5 +1,5 @@
-broker=${project.root}/build/bin/qpid-server
+broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT
broker.clean=${project.root}/clean-dir ${build.data}
broker.ready=Qpid Broker Ready
-test.excludesfile=${project.root}/08ExcludeList-nonvm
+test.excludesfile=${project.root}/ExcludeList ${project.root}/08ExcludeList-nonvm
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
index 31299ff9ff..ea0bae7a56 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
@@ -37,12 +37,15 @@ import java.util.LinkedList;
*/
public class MultipleJCAProviderRegistrationTest extends QpidTestCase
{
+
public void setUp() throws Exception
{
super.setUp();
stopBroker();
+ _broker = VM;
+
final String QpidHome = System.getProperty("QPID_HOME");
assertNotNull("QPID_HOME not set",QpidHome);
@@ -60,9 +63,7 @@ public class MultipleJCAProviderRegistrationTest extends QpidTestCase
// This is a bit evil it should be updated with QPID-1103
config.getConfiguration().setProperty("management.enabled", "false");
- ApplicationRegistry.initialise(config, 1);
-
- TransportConnection.createVMBroker(1);
+ startBroker();
}
public void test() throws Exception
@@ -98,4 +99,5 @@ public class MultipleJCAProviderRegistrationTest extends QpidTestCase
assertTrue("Client did not register any providers", additions.size() > 0);
}
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index bf87e8e84f..3f610aa15d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -174,7 +174,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.info("Awaiting Failover completion");
try
{
- failoverComplete.await(delay, TimeUnit.MILLISECONDS);
+ if (!failoverComplete.await(delay, TimeUnit.MILLISECONDS))
+ {
+ fail("failover did not complete");
+ }
}
catch (InterruptedException e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 2fa6f4f417..2a44c444e0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -27,41 +27,40 @@ import javax.jms.Connection;
public class FailoverBaseCase extends QpidTestCase
{
- private boolean failedOver = true;
-
- protected void setUp() throws java.lang.Exception
- {
- super.setUp();
+ protected int FAILING_VM_PORT = 2;
+ protected int FAILING_PORT = 5673;
- try
+ private boolean failedOver = false;
+
+ private int getFailingPort()
+ {
+ if (_broker.equals(VM))
{
- TransportConnection.createVMBroker(2);
+ return FAILING_VM_PORT;
}
- catch (Exception e)
+ else
{
- fail("Unable to create broker: " + e);
+ return FAILING_PORT;
}
+ }
+ protected void setUp() throws java.lang.Exception
+ {
+ super.setUp();
+ startBroker(getFailingPort());
}
/**
- * We are using failover factories, Note that 0.10 code path does not yet support failover.
+ * We are using failover factories
*
* @return a connection
* @throws Exception
*/
public Connection getConnection() throws Exception
{
- Connection conn;
- if( _broker.equals(VM) )
- {
- conn = getConnectionFactory("vmfailover").createConnection("guest", "guest");
- }
- else
- {
- conn = getConnectionFactory("failover").createConnection("guest", "guest");
- }
+ Connection conn =
+ getConnectionFactory("failover").createConnection("guest", "guest");
_connections.add(conn);
return conn;
}
@@ -70,8 +69,7 @@ public class FailoverBaseCase extends QpidTestCase
{
if (!failedOver)
{
- TransportConnection.killVMBroker(2);
- ApplicationRegistry.remove(2);
+ stopBroker(getFailingPort());
}
super.tearDown();
}
@@ -83,7 +81,13 @@ public class FailoverBaseCase extends QpidTestCase
public void failBroker()
{
failedOver = true;
- TransportConnection.killVMBroker(2);
- ApplicationRegistry.remove(2);
+ try
+ {
+ stopBroker(getFailingPort());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 0cbef4c520..4be67c9590 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -70,43 +70,43 @@ public class QpidTestCase extends TestCase
if (Boolean.getBoolean("test.excludes"))
{
_logger.info("Some tests should be excluded, building the exclude list");
- String exclusionListURI = System.getProperties().getProperty("test.excludesfile", "");
+ String exclusionListURIs = System.getProperties().getProperty("test.excludesfile", "");
String exclusionListString = System.getProperties().getProperty("test.excludeslist", "");
- File file = new File(exclusionListURI);
List<String> exclusionList = new ArrayList<String>();
- if (file.exists())
+
+ for (String uri : exclusionListURIs.split("\\s+"))
{
- _logger.info("Using exclude file: " + exclusionListURI);
- try
+ File file = new File(uri);
+ if (file.exists())
{
- BufferedReader in = new BufferedReader(new FileReader(file));
- String excludedTest = in.readLine();
- do
+ _logger.info("Using exclude file: " + uri);
+ try
{
- exclusionList.add(excludedTest);
- excludedTest = in.readLine();
+ BufferedReader in = new BufferedReader(new FileReader(file));
+ String excludedTest = in.readLine();
+ do
+ {
+ exclusionList.add(excludedTest);
+ excludedTest = in.readLine();
+ }
+ while (excludedTest != null);
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Exception when reading exclusion list", e);
}
- while (excludedTest != null);
- }
- catch (IOException e)
- {
- _logger.warn("Exception when reading exclusion list", e);
}
}
- else if (!exclusionListString.equals(""))
+
+ if (!exclusionListString.equals(""))
{
_logger.info("Using excludeslist: " + exclusionListString);
- // the exclusion list may be specified as a string
- StringTokenizer t = new StringTokenizer(exclusionListString, " ");
- while (t.hasMoreTokens())
+ for (String test : exclusionListString.split("\\s+"))
{
- exclusionList.add(t.nextToken());
+ exclusionList.add(test);
}
}
- else
- {
- throw new RuntimeException("Aborting test: Cannot find excludes file nor excludes list");
- }
+
_exclusionList = exclusionList;
}
@@ -136,17 +136,17 @@ public class QpidTestCase extends TestCase
private static final String QPID_HOME = "QPID_HOME";
protected int DEFAULT_VM_PORT = 1;
+ protected int DEFAULT_PORT = 5672;
protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
- private Process _brokerProcess;
+ private Map<Integer,Process> _brokers = new HashMap<Integer,Process>();
private InitialContext _initialContext;
private AMQConnectionFactory _connectionFactory;
- private boolean _brokerStarted;
// the connections created for a given test
protected List<Connection> _connections = new ArrayList<Connection>();
@@ -299,20 +299,44 @@ public class QpidTestCase extends TestCase
startBroker(0);
}
- public void startBroker(int port) throws Exception
+ private int getPort(int port)
{
if (_broker.equals(VM))
{
- //If we are starting on port 0 use the default VM_PORT
- port = port == 0 ? DEFAULT_VM_PORT : port;
+ return port == 0 ? DEFAULT_VM_PORT : port;
+ }
+ else if (!_broker.equals(EXTERNAL))
+ {
+ return port == 0 ? DEFAULT_PORT : port;
+ }
+ else
+ {
+ return port;
+ }
+ }
+ private String getBrokerCommand(int port)
+ {
+ return _broker
+ .replace("@PORT", "" + port)
+ .replace("@MPORT", "" + (port + (8999 - DEFAULT_PORT)));
+ }
+
+ public void startBroker(int port) throws Exception
+ {
+ port = getPort(port);
+
+ Process process = null;
+ if (_broker.equals(VM))
+ {
// create an in_VM broker
TransportConnection.createVMBroker(port);
}
else if (!_broker.equals(EXTERNAL))
{
- _logger.info("starting broker: " + _broker);
- ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+"));
+ String cmd = getBrokerCommand(port);
+ _logger.info("starting broker: " + cmd);
+ ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+"));
pb.redirectErrorStream(true);
Map<String, String> env = pb.environment();
@@ -323,9 +347,9 @@ public class QpidTestCase extends TestCase
//Augment Path with bin directory in QPID_HOME.
env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
- _brokerProcess = pb.start();
+ process = pb.start();
- Piper p = new Piper(_brokerProcess.getInputStream(),
+ Piper p = new Piper(process.getInputStream(),
System.getProperty(BROKER_READY));
p.start();
@@ -339,7 +363,7 @@ public class QpidTestCase extends TestCase
try
{
- int exit = _brokerProcess.exitValue();
+ int exit = process.exitValue();
_logger.info("broker aborted: " + exit);
cleanBroker();
throw new RuntimeException("broker aborted: " + exit);
@@ -349,7 +373,8 @@ public class QpidTestCase extends TestCase
// this is expect if the broker started succesfully
}
}
- _brokerStarted = true;
+
+ _brokers.put(port, process);
}
public void cleanBroker()
@@ -387,22 +412,21 @@ public class QpidTestCase extends TestCase
public void stopBroker(int port) throws Exception
{
- _logger.info("stopping broker: " + _broker);
- if (_brokerProcess != null)
+ port = getPort(port);
+
+ _logger.info("stopping broker: " + getBrokerCommand(port));
+ Process process = _brokers.remove(port);
+ if (process != null)
{
- _brokerProcess.destroy();
- _brokerProcess.waitFor();
- _logger.info("broker exited: " + _brokerProcess.exitValue());
- _brokerProcess = null;
+ process.destroy();
+ process.waitFor();
+ _logger.info("broker exited: " + process.exitValue());
}
else if (_broker.equals(VM))
{
- port = port == 0 ? DEFAULT_VM_PORT : port;
-
TransportConnection.killVMBroker(port);
ApplicationRegistry.remove(port);
}
- _brokerStarted = false;
}
protected void setSystemProperty(String property, String value)
@@ -489,14 +513,7 @@ public class QpidTestCase extends TestCase
_logger.info("get ConnectionFactory");
if (_connectionFactory == null)
{
- if (_broker.equals(VM))
- {
- _connectionFactory = getConnectionFactory("vm");
- }
- else
- {
- _connectionFactory = getConnectionFactory("local");
- }
+ _connectionFactory = getConnectionFactory("default");
}
return _connectionFactory;
}
@@ -512,6 +529,11 @@ public class QpidTestCase extends TestCase
*/
public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException
{
+ if (_broker.equals(VM))
+ {
+ factoryName += ".vm";
+ }
+
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
@@ -559,12 +581,9 @@ public class QpidTestCase extends TestCase
protected void tearDown() throws java.lang.Exception
{
// close all the connections used by this test.
- if (_brokerStarted)
+ for (Connection c : _connections)
{
- for (Connection c : _connections)
- {
- c.close();
- }
+ c.close();
}
revertSystemProperties();
diff --git a/qpid/java/test-provider.properties b/qpid/java/test-provider.properties
index 351fa59edb..7c9622630c 100644
--- a/qpid/java/test-provider.properties
+++ b/qpid/java/test-provider.properties
@@ -19,13 +19,15 @@
#
#
-connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
-connectionfactory.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
+connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
+connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
-connectionfactory.vmfailover = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
-connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='vm://:2'
+connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672'
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
+connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673'
+connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
+connectionfactory.connection2.vm = amqp://username:password@clientid/test?brokerlist='vm://:2'
queue.MyQueue = example.MyQueue