diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
commit | 939ccc98d866a00c3246880a6206d61e38debf1b (patch) | |
tree | ee2a1c4467ec6eea714e27e1131c70b795d7ef05 | |
parent | 9a420ca9f7d542927503d044abefaa5004576596 (diff) | |
download | qpid-python-939ccc98d866a00c3246880a6206d61e38debf1b.tar.gz |
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover
testing.
- Modified QpidTestCase to substitute port variables into broker
start/stop commands.
- Modified test profiles to use the new port variables.
- Modified QpidTestCase to permit multiple exclude files.
- Modified test profiles to make use of a common exclude list:
ExcludeList
- Added ConnectionTest.testResumeEmptyReplayBuffer.
- Made default exception handling for Connection and Session log the
exception.
- Added SenderExcetion to specifically signal problems with
transmitting connection data.
- Modified Session to catch and deal with connection send failures
for sessions with positive expiry.
- Modified FailoverBaseCase to work for non VM brokers.
- Made FailoverTest fail if failover times out.
- Modified JMS implementation to make use of the recently added low
level session resume.
- Unexcluded failover tests from 0-10 test profiles.
- Excluded MultipleJCAProviderRegistrationTest due to its testing
strategy resulting in spurious failure when running as part of the
larger test suite.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@708093 13f79535-47bb-0310-9956-ffa450edef68
24 files changed, 473 insertions, 224 deletions
diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList index 7810d42211..7512329220 100644 --- a/qpid/java/010ExcludeList +++ b/qpid/java/010ExcludeList @@ -19,21 +19,7 @@ org.apache.qpid.server.security.acl.SimpleACLTest#* org.apache.qpid.server.plugins.PluginTest#* // This test is not finished org.apache.qpid.test.testcases.TTLTest#* -// Those tests require failover support -org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser -org.apache.qpid.test.testcases.FailoverTest#* -org.apache.qpid.test.client.failover.FailoverTest#* +org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover // Those tests are testing 0.8 specific semantics org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P diff --git a/qpid/java/ExcludeList b/qpid/java/ExcludeList new file mode 100644 index 0000000000..5b5b73e8fa --- /dev/null +++ b/qpid/java/ExcludeList @@ -0,0 +1 @@ +org.apache.qpid.client.MultipleJCAProviderRegistrationTest#test diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0999a09ca5..4e8fdc2370 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; @@ -628,6 +629,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _delegate.makeBrokerConnection(brokerDetail); } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + return _delegate.executeRetrySupport(operation); + } + /** * Get the details of the currently active broker * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 60b827a426..b64147fe8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -27,20 +27,24 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; - public Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException; + Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException; - public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; - public void resubscribeSessions() throws JMSException, AMQException, FailoverException; + void resubscribeSessions() throws JMSException, AMQException, FailoverException; + + void closeConnection(long timeout) throws JMSException, AMQException; + + <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 6480a0da76..8a9abcc398 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,9 @@ package org.apache.qpid.client; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -31,6 +34,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; @@ -61,11 +65,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec * The QpidConeection instance that is mapped with thie JMS connection. */ org.apache.qpid.transport.Connection _qpidConnection; + private ConnectionException exception = null; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { _conn = conn; + _qpidConnection = new Connection(); + _qpidConnection.setConnectionListener(this); } /** @@ -129,16 +136,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) { - _logger.debug("creating connection with broker " + " host: " + brokerDetail - .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn - .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword()); } - _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); _conn._connected = true; @@ -160,8 +167,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public void resubscribeSessions() throws JMSException, AMQException, FailoverException { - //NOT implemented as railover is handled at a lower level - throw new FailoverException("failing to reconnect during failover, operation not supported."); + List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + for (AMQSession s : sessions) + { + ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; + s.resubscribe(); + } } @@ -181,6 +193,43 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void exception(Connection conn, ConnectionException exc) { + if (exception != null) + { + _logger.error("previous exception", exception); + } + + exception = exc; + } + + public void closed(Connection conn) + { + ConnectionException exc = exception; + exception = null; + + ConnectionClose close = exc.getClose(); + if (close == null) + { + try + { + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _qpidConnection.resume(); + + if (_conn.firePreResubscribe()) + { + _conn.resubscribeSessions(); + } + + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + } + ExceptionListener listener = _conn._exceptionListener; if (listener == null) { @@ -188,19 +237,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } else { - ConnectionClose close = exc.getClose(); String code = null; if (close != null) { code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); ex.initCause(exc); - - _conn._exceptionListener.onException(ex); + listener.onException(ex); } } - public void closed(Connection conn) {} + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new RuntimeException(e); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 8d42a2f201..035e3830ca 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -247,4 +247,41 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); } } + + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + while (true) + { + try + { + _conn.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _logger.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (_conn.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _logger.debug("Failover exception caught during operation: " + e, e); + } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } + } + } + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cbdefd0548..b5d12d9520 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -261,6 +261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _rollbackMark = new AtomicLong(-1); /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); @@ -1809,6 +1810,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _failedOverDirty = true; } + + _rollbackMark.set(-1); resubscribeProducers(); resubscribeConsumers(); } @@ -2601,7 +2604,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Used for debugging in the dispatcher. */ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread @@ -2611,7 +2613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); - private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7829966315..ab983aa842 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -113,11 +113,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - // create the qpid session with an expiry <= 0 so that the session does not expire - _qpidSession = qpidConnection.createSession(0); - // set the exception listnere for this session + _qpidSession = _qpidConnection.createSession(1); _qpidSession.setSessionListener(this); - // set transacted if required if (_transacted) { _qpidSession.txSelect(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index cf7e978c03..e9e52cc97c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -99,37 +99,6 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup */ public T execute() throws E { - while (true) - { - try - { - connection.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.debug("Interrupted: " + e, e); - - return null; - } - - synchronized (connection.getFailoverMutex()) - { - try - { - return operation.execute(); - } - catch (FailoverException e) - { - _log.debug("Failover exception caught during operation: " + e, e); - } - catch (IllegalStateException e) - { - if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) - { - throw e; - } - } - } - } + return connection.executeRetrySupport(operation); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7a66c2c238..cf9b9145a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -62,7 +62,7 @@ public class Connection extends ConnectionInvoker public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exception) { - throw exception; + log.error(exception, "connection exception"); } public void closed(Connection conn) {} } @@ -155,7 +155,12 @@ public class Connection extends ConnectionInvoker return saslClient; } - public void connect(String host, int port, String vhost, String username, String password,boolean ssl) + public void connect(String host, int port, String vhost, String username, String password) + { + connect(host, port, vhost, username, password, false); + } + + public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { synchronized (lock) { @@ -163,7 +168,7 @@ public class Connection extends ConnectionInvoker delegate = new ClientDelegate(vhost, username, password); - IoTransport.connect(host, port, ConnectionBinding.get(this),ssl); + IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -371,12 +376,11 @@ public class Connection extends ConnectionInvoker case CLOSING: error = e; lock.notifyAll(); - break; - default: - listener.exception(this, e); - break; + return; } } + + listener.exception(this, e); } public void exception(Throwable t) @@ -402,13 +406,13 @@ public class Connection extends ConnectionInvoker public void closed() { - log.debug("connection closed: %s", this); - if (state == OPEN) { exception(new ConnectionException("connection aborted")); } + log.debug("connection closed: %s", this); + synchronized (lock) { List<Session> values = new ArrayList<Session>(channels.values()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java new file mode 100644 index 0000000000..a96079dc27 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SenderException.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SenderException + * + */ + +public class SenderException extends TransportException +{ + + public SenderException(String message, Throwable cause) + { + super(message, cause); + } + + public SenderException(String message) + { + super(message); + } + + public SenderException(Throwable cause) + { + super(cause); + } + + public void rethrow() + { + throw new SenderException(getMessage(), this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index bab4bb35ee..8877b7b683 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -65,7 +65,7 @@ public class Session extends SessionInvoker public void exception(Session ssn, SessionException exc) { - throw exc; + log.error(exc, "session exception"); } public void closed(Session ssn) {} @@ -195,6 +195,9 @@ public class Session extends SessionInvoker send(m); } } + + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); } } @@ -219,6 +222,7 @@ public class Session extends SessionInvoker this.commandsIn = id; if (!incomingInit) { + incomingInit = true; maxProcessed = commandsIn - 1; syncPoint = maxProcessed; } @@ -242,6 +246,11 @@ public class Session extends SessionInvoker final void identify(Method cmd) { + if (!incomingInit) + { + throw new IllegalStateException(); + } + int id = nextCommandId(); cmd.setId(id); @@ -417,8 +426,8 @@ public class Session extends SessionInvoker default: throw new SessionException (String.format - ("timed out waiting for session to become open %s", - state)); + ("timed out waiting for session to become open " + + "(state=%s)", state)); } int next = commandsOut++; @@ -429,7 +438,26 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && isFull(next)) { - sessionFlush(COMPLETED); + if (state == OPEN) + { + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will + // happen again on resume + log.error(e, "error sending flush (full replay buffer)"); + } + else + { + e.rethrow(); + } + } + } w.await(); } } @@ -452,7 +480,23 @@ public class Session extends SessionInvoker m.setSync(true); } needSync = !m.isSync(); - send(m); + try + { + send(m); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending command"); + } + else + { + e.rethrow(); + } + } if (autoSync) { sync(); @@ -462,7 +506,23 @@ public class Session extends SessionInvoker // wraparound if ((next % 65536) == 0) { - sessionFlush(COMPLETED); + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending flush (periodic)"); + } + else + { + e.rethrow(); + } + } } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 73ff039be5..36ea14856a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -92,7 +93,7 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> { if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } final int size = buffer.length; @@ -125,12 +126,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } if (head - tail >= size) { - throw new TransportException(String.format("write timed out: %s, %s", head, tail)); + throw new SenderException(String.format("write timed out: %s, %s", head, tail)); } } continue; @@ -192,19 +193,19 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> join(timeout); if (isAlive()) { - throw new TransportException("join timed out"); + throw new SenderException("join timed out"); } } transport.getReceiver().close(false); } catch (InterruptedException e) { - throw new TransportException(e); + throw new SenderException(e); } if (reportException && exception != null) { - throw new TransportException(exception); + throw new SenderException(exception); } } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index b1fe08bfb9..1da56654f0 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -48,6 +48,7 @@ public class ConnectionTest extends TestCase implements SessionListener private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); + private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); protected void setUp() throws Exception { @@ -71,7 +72,7 @@ public class ConnectionTest extends TestCase implements SessionListener public void opened(Session ssn) {} - public void message(Session ssn, MessageTransfer xfr) + public void message(final Session ssn, MessageTransfer xfr) { if (queue) { @@ -86,6 +87,25 @@ public class ConnectionTest extends TestCase implements SessionListener { ssn.getConnection().close(); } + else if (body.startsWith("DELAYED_CLOSE")) + { + ssn.processed(xfr); + new Thread() + { + public void run() + { + try + { + sleep(3000); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + ssn.getConnection().close(); + } + }.start(); + } else if (body.startsWith("ECHO")) { int id = xfr.getId(); @@ -139,7 +159,7 @@ public class ConnectionTest extends TestCase implements SessionListener } } }); - conn.connect("localhost", port, null, "guest", "guest",false); + conn.connect("localhost", port, null, "guest", "guest", false); return conn; } @@ -148,7 +168,7 @@ public class ConnectionTest extends TestCase implements SessionListener Condition closed = new Condition(); Connection conn = connect(closed); - Session ssn = conn.createSession(); + Session ssn = conn.createSession(1); send(ssn, "CLOSE"); if (!closed.get(3000)) @@ -167,44 +187,47 @@ public class ConnectionTest extends TestCase implements SessionListener } } - public void testResume() throws Exception + class FailoverConnectionListener implements ConnectionListener { - Connection conn = new Connection(); - conn.connect("localhost", port, null, "guest", "guest",false); + public void opened(Connection conn) {} - conn.setConnectionListener(new ConnectionListener() + public void exception(Connection conn, ConnectionException e) { - public void opened(Connection conn) {} - public void exception(Connection conn, ConnectionException e) - { - throw e; - } - public void closed(Connection conn) - { - queue = true; - conn.connect("localhost", port, null, "guest", "guest",false); - conn.resume(); - } - }); + throw e; + } - Session ssn = conn.createSession(1); - final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); - ssn.setSessionListener(new SessionListener() + public void closed(Connection conn) { - public void opened(Session s) {} - public void exception(Session s, SessionException e) {} - public void message(Session s, MessageTransfer xfr) - { - synchronized (incoming) - { - incoming.add(xfr); - incoming.notifyAll(); - } + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + } - s.processed(xfr); + class TestSessionListener implements SessionListener + { + public void opened(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); } - public void closed(Session s) {} - }); + + s.processed(xfr); + } + public void closed(Session s) {} + } + + public void testResumeNonemptyReplayBuffer() throws Exception + { + Connection conn = new Connection(); + conn.setConnectionListener(new FailoverConnectionListener()); + conn.connect("localhost", port, null, "guest", "guest"); + Session ssn = conn.createSession(1); + ssn.setSessionListener(new TestSessionListener()); send(ssn, "SINK 0"); send(ssn, "ECHO 1"); @@ -251,4 +274,24 @@ public class ConnectionTest extends TestCase implements SessionListener } } + public void testResumeEmptyReplayBuffer() throws InterruptedException + { + Connection conn = new Connection(); + conn.setConnectionListener(new FailoverConnectionListener()); + conn.connect("localhost", port, null, "guest", "guest"); + Session ssn = conn.createSession(1); + ssn.setSessionListener(new TestSessionListener()); + + send(ssn, "SINK 0"); + send(ssn, "SINK 1"); + send(ssn, "DELAYED_CLOSE 2"); + ssn.sync(); + Thread.sleep(6000); + send(ssn, "SINK 3"); + ssn.sync(); + System.out.println(messages); + assertEquals(1, messages.size()); + assertEquals("SINK 3", messages.get(0).getBodyString()); + } + } diff --git a/qpid/java/cpp.async.testprofile b/qpid/java/cpp.async.testprofile index c7554165f0..746ef2c0ef 100644 --- a/qpid/java/cpp.async.testprofile +++ b/qpid/java/cpp.async.testprofile @@ -1,3 +1,3 @@ broker.version=0-10 -broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --auth no -test.excludesfile=${project.root}/010ExcludeList-store +broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/msgstore.so --auth no +test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList-store diff --git a/qpid/java/cpp.noprefetch.testprofile b/qpid/java/cpp.noprefetch.testprofile index 760f0d147f..e502f3c950 100644 --- a/qpid/java/cpp.noprefetch.testprofile +++ b/qpid/java/cpp.noprefetch.testprofile @@ -1,4 +1,4 @@ broker.version=0-10 -broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --auth no -test.excludesfile=${project.root}/010ExcludeList-noPrefetch +broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/msgstore.so --auth no +test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList-noPrefetch max_prefetch=0 diff --git a/qpid/java/cpp.testprofile b/qpid/java/cpp.testprofile index 68ac8b8be8..af2d0f58e4 100644 --- a/qpid/java/cpp.testprofile +++ b/qpid/java/cpp.testprofile @@ -1,3 +1,3 @@ broker.version=0-10 -broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --auth no -test.excludesfile=${project.root}/010ExcludeList +broker=${project.root}/../cpp/src/qpidd -p @PORT --data-dir ${build.data}/@PORT -t --auth no +test.excludesfile=${project.root}/ExcludeList ${project.root}/010ExcludeList diff --git a/qpid/java/default.testprofile b/qpid/java/default.testprofile index 7354cbda48..46c3f7746a 100644 --- a/qpid/java/default.testprofile +++ b/qpid/java/default.testprofile @@ -14,7 +14,7 @@ log4j.configuration=file:///${project.root}/log4j-test.xml log4j.debug=false test.excludes=true -test.excludesfile=${project.root}/08ExcludeList +test.excludesfile=${project.root}/ExcludeList ${project.root}/08ExcludeList test.fork=no test.mem=512M test=*Test diff --git a/qpid/java/java.testprofile b/qpid/java/java.testprofile index cef0a10661..ab98e47e97 100644 --- a/qpid/java/java.testprofile +++ b/qpid/java/java.testprofile @@ -1,5 +1,5 @@ -broker=${project.root}/build/bin/qpid-server +broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT broker.clean=${project.root}/clean-dir ${build.data} broker.ready=Qpid Broker Ready -test.excludesfile=${project.root}/08ExcludeList-nonvm +test.excludesfile=${project.root}/ExcludeList ${project.root}/08ExcludeList-nonvm diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java index 31299ff9ff..ea0bae7a56 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java @@ -37,12 +37,15 @@ import java.util.LinkedList; */ public class MultipleJCAProviderRegistrationTest extends QpidTestCase { + public void setUp() throws Exception { super.setUp(); stopBroker(); + _broker = VM; + final String QpidHome = System.getProperty("QPID_HOME"); assertNotNull("QPID_HOME not set",QpidHome); @@ -60,9 +63,7 @@ public class MultipleJCAProviderRegistrationTest extends QpidTestCase // This is a bit evil it should be updated with QPID-1103 config.getConfiguration().setProperty("management.enabled", "false"); - ApplicationRegistry.initialise(config, 1); - - TransportConnection.createVMBroker(1); + startBroker(); } public void test() throws Exception @@ -98,4 +99,5 @@ public class MultipleJCAProviderRegistrationTest extends QpidTestCase assertTrue("Client did not register any providers", additions.size() > 0); } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index bf87e8e84f..3f610aa15d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -174,7 +174,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.info("Awaiting Failover completion"); try { - failoverComplete.await(delay, TimeUnit.MILLISECONDS); + if (!failoverComplete.await(delay, TimeUnit.MILLISECONDS)) + { + fail("failover did not complete"); + } } catch (InterruptedException e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 2fa6f4f417..2a44c444e0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -27,41 +27,40 @@ import javax.jms.Connection; public class FailoverBaseCase extends QpidTestCase { - private boolean failedOver = true; - - protected void setUp() throws java.lang.Exception - { - super.setUp(); + protected int FAILING_VM_PORT = 2; + protected int FAILING_PORT = 5673; - try + private boolean failedOver = false; + + private int getFailingPort() + { + if (_broker.equals(VM)) { - TransportConnection.createVMBroker(2); + return FAILING_VM_PORT; } - catch (Exception e) + else { - fail("Unable to create broker: " + e); + return FAILING_PORT; } + } + protected void setUp() throws java.lang.Exception + { + super.setUp(); + startBroker(getFailingPort()); } /** - * We are using failover factories, Note that 0.10 code path does not yet support failover. + * We are using failover factories * * @return a connection * @throws Exception */ public Connection getConnection() throws Exception { - Connection conn; - if( _broker.equals(VM) ) - { - conn = getConnectionFactory("vmfailover").createConnection("guest", "guest"); - } - else - { - conn = getConnectionFactory("failover").createConnection("guest", "guest"); - } + Connection conn = + getConnectionFactory("failover").createConnection("guest", "guest"); _connections.add(conn); return conn; } @@ -70,8 +69,7 @@ public class FailoverBaseCase extends QpidTestCase { if (!failedOver) { - TransportConnection.killVMBroker(2); - ApplicationRegistry.remove(2); + stopBroker(getFailingPort()); } super.tearDown(); } @@ -83,7 +81,13 @@ public class FailoverBaseCase extends QpidTestCase public void failBroker() { failedOver = true; - TransportConnection.killVMBroker(2); - ApplicationRegistry.remove(2); + try + { + stopBroker(getFailingPort()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 0cbef4c520..4be67c9590 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -70,43 +70,43 @@ public class QpidTestCase extends TestCase if (Boolean.getBoolean("test.excludes")) { _logger.info("Some tests should be excluded, building the exclude list"); - String exclusionListURI = System.getProperties().getProperty("test.excludesfile", ""); + String exclusionListURIs = System.getProperties().getProperty("test.excludesfile", ""); String exclusionListString = System.getProperties().getProperty("test.excludeslist", ""); - File file = new File(exclusionListURI); List<String> exclusionList = new ArrayList<String>(); - if (file.exists()) + + for (String uri : exclusionListURIs.split("\\s+")) { - _logger.info("Using exclude file: " + exclusionListURI); - try + File file = new File(uri); + if (file.exists()) { - BufferedReader in = new BufferedReader(new FileReader(file)); - String excludedTest = in.readLine(); - do + _logger.info("Using exclude file: " + uri); + try { - exclusionList.add(excludedTest); - excludedTest = in.readLine(); + BufferedReader in = new BufferedReader(new FileReader(file)); + String excludedTest = in.readLine(); + do + { + exclusionList.add(excludedTest); + excludedTest = in.readLine(); + } + while (excludedTest != null); + } + catch (IOException e) + { + _logger.warn("Exception when reading exclusion list", e); } - while (excludedTest != null); - } - catch (IOException e) - { - _logger.warn("Exception when reading exclusion list", e); } } - else if (!exclusionListString.equals("")) + + if (!exclusionListString.equals("")) { _logger.info("Using excludeslist: " + exclusionListString); - // the exclusion list may be specified as a string - StringTokenizer t = new StringTokenizer(exclusionListString, " "); - while (t.hasMoreTokens()) + for (String test : exclusionListString.split("\\s+")) { - exclusionList.add(t.nextToken()); + exclusionList.add(test); } } - else - { - throw new RuntimeException("Aborting test: Cannot find excludes file nor excludes list"); - } + _exclusionList = exclusionList; } @@ -136,17 +136,17 @@ public class QpidTestCase extends TestCase private static final String QPID_HOME = "QPID_HOME"; protected int DEFAULT_VM_PORT = 1; + protected int DEFAULT_PORT = 5672; protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); private String _output = System.getProperty(TEST_OUTPUT); - private Process _brokerProcess; + private Map<Integer,Process> _brokers = new HashMap<Integer,Process>(); private InitialContext _initialContext; private AMQConnectionFactory _connectionFactory; - private boolean _brokerStarted; // the connections created for a given test protected List<Connection> _connections = new ArrayList<Connection>(); @@ -299,20 +299,44 @@ public class QpidTestCase extends TestCase startBroker(0); } - public void startBroker(int port) throws Exception + private int getPort(int port) { if (_broker.equals(VM)) { - //If we are starting on port 0 use the default VM_PORT - port = port == 0 ? DEFAULT_VM_PORT : port; + return port == 0 ? DEFAULT_VM_PORT : port; + } + else if (!_broker.equals(EXTERNAL)) + { + return port == 0 ? DEFAULT_PORT : port; + } + else + { + return port; + } + } + private String getBrokerCommand(int port) + { + return _broker + .replace("@PORT", "" + port) + .replace("@MPORT", "" + (port + (8999 - DEFAULT_PORT))); + } + + public void startBroker(int port) throws Exception + { + port = getPort(port); + + Process process = null; + if (_broker.equals(VM)) + { // create an in_VM broker TransportConnection.createVMBroker(port); } else if (!_broker.equals(EXTERNAL)) { - _logger.info("starting broker: " + _broker); - ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+")); + String cmd = getBrokerCommand(port); + _logger.info("starting broker: " + cmd); + ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); Map<String, String> env = pb.environment(); @@ -323,9 +347,9 @@ public class QpidTestCase extends TestCase //Augment Path with bin directory in QPID_HOME. env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); - _brokerProcess = pb.start(); + process = pb.start(); - Piper p = new Piper(_brokerProcess.getInputStream(), + Piper p = new Piper(process.getInputStream(), System.getProperty(BROKER_READY)); p.start(); @@ -339,7 +363,7 @@ public class QpidTestCase extends TestCase try { - int exit = _brokerProcess.exitValue(); + int exit = process.exitValue(); _logger.info("broker aborted: " + exit); cleanBroker(); throw new RuntimeException("broker aborted: " + exit); @@ -349,7 +373,8 @@ public class QpidTestCase extends TestCase // this is expect if the broker started succesfully } } - _brokerStarted = true; + + _brokers.put(port, process); } public void cleanBroker() @@ -387,22 +412,21 @@ public class QpidTestCase extends TestCase public void stopBroker(int port) throws Exception { - _logger.info("stopping broker: " + _broker); - if (_brokerProcess != null) + port = getPort(port); + + _logger.info("stopping broker: " + getBrokerCommand(port)); + Process process = _brokers.remove(port); + if (process != null) { - _brokerProcess.destroy(); - _brokerProcess.waitFor(); - _logger.info("broker exited: " + _brokerProcess.exitValue()); - _brokerProcess = null; + process.destroy(); + process.waitFor(); + _logger.info("broker exited: " + process.exitValue()); } else if (_broker.equals(VM)) { - port = port == 0 ? DEFAULT_VM_PORT : port; - TransportConnection.killVMBroker(port); ApplicationRegistry.remove(port); } - _brokerStarted = false; } protected void setSystemProperty(String property, String value) @@ -489,14 +513,7 @@ public class QpidTestCase extends TestCase _logger.info("get ConnectionFactory"); if (_connectionFactory == null) { - if (_broker.equals(VM)) - { - _connectionFactory = getConnectionFactory("vm"); - } - else - { - _connectionFactory = getConnectionFactory("local"); - } + _connectionFactory = getConnectionFactory("default"); } return _connectionFactory; } @@ -512,6 +529,11 @@ public class QpidTestCase extends TestCase */ public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException { + if (_broker.equals(VM)) + { + factoryName += ".vm"; + } + return (AMQConnectionFactory) getInitialContext().lookup(factoryName); } @@ -559,12 +581,9 @@ public class QpidTestCase extends TestCase protected void tearDown() throws java.lang.Exception { // close all the connections used by this test. - if (_brokerStarted) + for (Connection c : _connections) { - for (Connection c : _connections) - { - c.close(); - } + c.close(); } revertSystemProperties(); diff --git a/qpid/java/test-provider.properties b/qpid/java/test-provider.properties index 351fa59edb..7c9622630c 100644 --- a/qpid/java/test-provider.properties +++ b/qpid/java/test-provider.properties @@ -19,13 +19,15 @@ # # -connectionfactory.local = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' -connectionfactory.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' +connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' +connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' -connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' -connectionfactory.vmfailover = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' -connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='vm://:1' -connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='vm://:2' +connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672' +connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' +connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' +connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673' +connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' +connectionfactory.connection2.vm = amqp://username:password@clientid/test?brokerlist='vm://:2' queue.MyQueue = example.MyQueue |