diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
commit | 825327492ededcf62f7307a96eb29f5e7df88351 (patch) | |
tree | 05ce4cad575ce2c0379c5440f5ec197629c97c16 /java/common | |
parent | 55dae2c49ba8c283583f3688784f2b763e772020 (diff) | |
download | qpid-python-825327492ededcf62f7307a96eb29f5e7df88351.tar.gz |
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover
testing.
- Modified QpidTestCase to substitute port variables into broker
start/stop commands.
- Modified test profiles to use the new port variables.
- Modified QpidTestCase to permit multiple exclude files.
- Modified test profiles to make use of a common exclude list:
ExcludeList
- Added ConnectionTest.testResumeEmptyReplayBuffer.
- Made default exception handling for Connection and Session log the
exception.
- Added SenderExcetion to specifically signal problems with
transmitting connection data.
- Modified Session to catch and deal with connection send failures
for sessions with positive expiry.
- Modified FailoverBaseCase to work for non VM brokers.
- Made FailoverTest fail if failover times out.
- Modified JMS implementation to make use of the recently added low
level session resume.
- Unexcluded failover tests from 0-10 test profiles.
- Excluded MultipleJCAProviderRegistrationTest due to its testing
strategy resulting in spurious failure when running as part of the
larger test suite.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
5 files changed, 215 insertions, 55 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7a66c2c238..cf9b9145a9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -62,7 +62,7 @@ public class Connection extends ConnectionInvoker public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exception) { - throw exception; + log.error(exception, "connection exception"); } public void closed(Connection conn) {} } @@ -155,7 +155,12 @@ public class Connection extends ConnectionInvoker return saslClient; } - public void connect(String host, int port, String vhost, String username, String password,boolean ssl) + public void connect(String host, int port, String vhost, String username, String password) + { + connect(host, port, vhost, username, password, false); + } + + public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { synchronized (lock) { @@ -163,7 +168,7 @@ public class Connection extends ConnectionInvoker delegate = new ClientDelegate(vhost, username, password); - IoTransport.connect(host, port, ConnectionBinding.get(this),ssl); + IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -371,12 +376,11 @@ public class Connection extends ConnectionInvoker case CLOSING: error = e; lock.notifyAll(); - break; - default: - listener.exception(this, e); - break; + return; } } + + listener.exception(this, e); } public void exception(Throwable t) @@ -402,13 +406,13 @@ public class Connection extends ConnectionInvoker public void closed() { - log.debug("connection closed: %s", this); - if (state == OPEN) { exception(new ConnectionException("connection aborted")); } + log.debug("connection closed: %s", this); + synchronized (lock) { List<Session> values = new ArrayList<Session>(channels.values()); diff --git a/java/common/src/main/java/org/apache/qpid/transport/SenderException.java b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java new file mode 100644 index 0000000000..a96079dc27 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SenderException + * + */ + +public class SenderException extends TransportException +{ + + public SenderException(String message, Throwable cause) + { + super(message, cause); + } + + public SenderException(String message) + { + super(message); + } + + public SenderException(Throwable cause) + { + super(cause); + } + + public void rethrow() + { + throw new SenderException(getMessage(), this); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index bab4bb35ee..8877b7b683 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -65,7 +65,7 @@ public class Session extends SessionInvoker public void exception(Session ssn, SessionException exc) { - throw exc; + log.error(exc, "session exception"); } public void closed(Session ssn) {} @@ -195,6 +195,9 @@ public class Session extends SessionInvoker send(m); } } + + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); } } @@ -219,6 +222,7 @@ public class Session extends SessionInvoker this.commandsIn = id; if (!incomingInit) { + incomingInit = true; maxProcessed = commandsIn - 1; syncPoint = maxProcessed; } @@ -242,6 +246,11 @@ public class Session extends SessionInvoker final void identify(Method cmd) { + if (!incomingInit) + { + throw new IllegalStateException(); + } + int id = nextCommandId(); cmd.setId(id); @@ -417,8 +426,8 @@ public class Session extends SessionInvoker default: throw new SessionException (String.format - ("timed out waiting for session to become open %s", - state)); + ("timed out waiting for session to become open " + + "(state=%s)", state)); } int next = commandsOut++; @@ -429,7 +438,26 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && isFull(next)) { - sessionFlush(COMPLETED); + if (state == OPEN) + { + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will + // happen again on resume + log.error(e, "error sending flush (full replay buffer)"); + } + else + { + e.rethrow(); + } + } + } w.await(); } } @@ -452,7 +480,23 @@ public class Session extends SessionInvoker m.setSync(true); } needSync = !m.isSync(); - send(m); + try + { + send(m); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending command"); + } + else + { + e.rethrow(); + } + } if (autoSync) { sync(); @@ -462,7 +506,23 @@ public class Session extends SessionInvoker // wraparound if ((next % 65536) == 0) { - sessionFlush(COMPLETED); + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (expiry > 0) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending flush (periodic)"); + } + else + { + e.rethrow(); + } + } } } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 73ff039be5..36ea14856a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -92,7 +93,7 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> { if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } final int size = buffer.length; @@ -125,12 +126,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> if (closed.get()) { - throw new TransportException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } if (head - tail >= size) { - throw new TransportException(String.format("write timed out: %s, %s", head, tail)); + throw new SenderException(String.format("write timed out: %s, %s", head, tail)); } } continue; @@ -192,19 +193,19 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> join(timeout); if (isAlive()) { - throw new TransportException("join timed out"); + throw new SenderException("join timed out"); } } transport.getReceiver().close(false); } catch (InterruptedException e) { - throw new TransportException(e); + throw new SenderException(e); } if (reportException && exception != null) { - throw new TransportException(exception); + throw new SenderException(exception); } } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index b1fe08bfb9..1da56654f0 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -48,6 +48,7 @@ public class ConnectionTest extends TestCase implements SessionListener private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); + private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); protected void setUp() throws Exception { @@ -71,7 +72,7 @@ public class ConnectionTest extends TestCase implements SessionListener public void opened(Session ssn) {} - public void message(Session ssn, MessageTransfer xfr) + public void message(final Session ssn, MessageTransfer xfr) { if (queue) { @@ -86,6 +87,25 @@ public class ConnectionTest extends TestCase implements SessionListener { ssn.getConnection().close(); } + else if (body.startsWith("DELAYED_CLOSE")) + { + ssn.processed(xfr); + new Thread() + { + public void run() + { + try + { + sleep(3000); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + ssn.getConnection().close(); + } + }.start(); + } else if (body.startsWith("ECHO")) { int id = xfr.getId(); @@ -139,7 +159,7 @@ public class ConnectionTest extends TestCase implements SessionListener } } }); - conn.connect("localhost", port, null, "guest", "guest",false); + conn.connect("localhost", port, null, "guest", "guest", false); return conn; } @@ -148,7 +168,7 @@ public class ConnectionTest extends TestCase implements SessionListener Condition closed = new Condition(); Connection conn = connect(closed); - Session ssn = conn.createSession(); + Session ssn = conn.createSession(1); send(ssn, "CLOSE"); if (!closed.get(3000)) @@ -167,44 +187,47 @@ public class ConnectionTest extends TestCase implements SessionListener } } - public void testResume() throws Exception + class FailoverConnectionListener implements ConnectionListener { - Connection conn = new Connection(); - conn.connect("localhost", port, null, "guest", "guest",false); + public void opened(Connection conn) {} - conn.setConnectionListener(new ConnectionListener() + public void exception(Connection conn, ConnectionException e) { - public void opened(Connection conn) {} - public void exception(Connection conn, ConnectionException e) - { - throw e; - } - public void closed(Connection conn) - { - queue = true; - conn.connect("localhost", port, null, "guest", "guest",false); - conn.resume(); - } - }); + throw e; + } - Session ssn = conn.createSession(1); - final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); - ssn.setSessionListener(new SessionListener() + public void closed(Connection conn) { - public void opened(Session s) {} - public void exception(Session s, SessionException e) {} - public void message(Session s, MessageTransfer xfr) - { - synchronized (incoming) - { - incoming.add(xfr); - incoming.notifyAll(); - } + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + } - s.processed(xfr); + class TestSessionListener implements SessionListener + { + public void opened(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); } - public void closed(Session s) {} - }); + + s.processed(xfr); + } + public void closed(Session s) {} + } + + public void testResumeNonemptyReplayBuffer() throws Exception + { + Connection conn = new Connection(); + conn.setConnectionListener(new FailoverConnectionListener()); + conn.connect("localhost", port, null, "guest", "guest"); + Session ssn = conn.createSession(1); + ssn.setSessionListener(new TestSessionListener()); send(ssn, "SINK 0"); send(ssn, "ECHO 1"); @@ -251,4 +274,24 @@ public class ConnectionTest extends TestCase implements SessionListener } } + public void testResumeEmptyReplayBuffer() throws InterruptedException + { + Connection conn = new Connection(); + conn.setConnectionListener(new FailoverConnectionListener()); + conn.connect("localhost", port, null, "guest", "guest"); + Session ssn = conn.createSession(1); + ssn.setSessionListener(new TestSessionListener()); + + send(ssn, "SINK 0"); + send(ssn, "SINK 1"); + send(ssn, "DELAYED_CLOSE 2"); + ssn.sync(); + Thread.sleep(6000); + send(ssn, "SINK 3"); + ssn.sync(); + System.out.println(messages); + assertEquals(1, messages.size()); + assertEquals("SINK 3", messages.get(0).getBodyString()); + } + } |