summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
commit825327492ededcf62f7307a96eb29f5e7df88351 (patch)
tree05ce4cad575ce2c0379c5440f5ec197629c97c16 /java/common
parent55dae2c49ba8c283583f3688784f2b763e772020 (diff)
downloadqpid-python-825327492ededcf62f7307a96eb29f5e7df88351.tar.gz
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover testing. - Modified QpidTestCase to substitute port variables into broker start/stop commands. - Modified test profiles to use the new port variables. - Modified QpidTestCase to permit multiple exclude files. - Modified test profiles to make use of a common exclude list: ExcludeList - Added ConnectionTest.testResumeEmptyReplayBuffer. - Made default exception handling for Connection and Session log the exception. - Added SenderExcetion to specifically signal problems with transmitting connection data. - Modified Session to catch and deal with connection send failures for sessions with positive expiry. - Modified FailoverBaseCase to work for non VM brokers. - Made FailoverTest fail if failover times out. - Modified JMS implementation to make use of the recently added low level session resume. - Unexcluded failover tests from 0-10 test profiles. - Excluded MultipleJCAProviderRegistrationTest due to its testing strategy resulting in spurious failure when running as part of the larger test suite. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SenderException.java52
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java72
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java13
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java111
5 files changed, 215 insertions, 55 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7a66c2c238..cf9b9145a9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -62,7 +62,7 @@ public class Connection extends ConnectionInvoker
public void opened(Connection conn) {}
public void exception(Connection conn, ConnectionException exception)
{
- throw exception;
+ log.error(exception, "connection exception");
}
public void closed(Connection conn) {}
}
@@ -155,7 +155,12 @@ public class Connection extends ConnectionInvoker
return saslClient;
}
- public void connect(String host, int port, String vhost, String username, String password,boolean ssl)
+ public void connect(String host, int port, String vhost, String username, String password)
+ {
+ connect(host, port, vhost, username, password, false);
+ }
+
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
{
synchronized (lock)
{
@@ -163,7 +168,7 @@ public class Connection extends ConnectionInvoker
delegate = new ClientDelegate(vhost, username, password);
- IoTransport.connect(host, port, ConnectionBinding.get(this),ssl);
+ IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -371,12 +376,11 @@ public class Connection extends ConnectionInvoker
case CLOSING:
error = e;
lock.notifyAll();
- break;
- default:
- listener.exception(this, e);
- break;
+ return;
}
}
+
+ listener.exception(this, e);
}
public void exception(Throwable t)
@@ -402,13 +406,13 @@ public class Connection extends ConnectionInvoker
public void closed()
{
- log.debug("connection closed: %s", this);
-
if (state == OPEN)
{
exception(new ConnectionException("connection aborted"));
}
+ log.debug("connection closed: %s", this);
+
synchronized (lock)
{
List<Session> values = new ArrayList<Session>(channels.values());
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SenderException.java b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java
new file mode 100644
index 0000000000..a96079dc27
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SenderException.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SenderException
+ *
+ */
+
+public class SenderException extends TransportException
+{
+
+ public SenderException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public SenderException(String message)
+ {
+ super(message);
+ }
+
+ public SenderException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public void rethrow()
+ {
+ throw new SenderException(getMessage(), this);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index bab4bb35ee..8877b7b683 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -65,7 +65,7 @@ public class Session extends SessionInvoker
public void exception(Session ssn, SessionException exc)
{
- throw exc;
+ log.error(exc, "session exception");
}
public void closed(Session ssn) {}
@@ -195,6 +195,9 @@ public class Session extends SessionInvoker
send(m);
}
}
+
+ sessionCommandPoint(commandsOut, 0);
+ sessionFlush(COMPLETED);
}
}
@@ -219,6 +222,7 @@ public class Session extends SessionInvoker
this.commandsIn = id;
if (!incomingInit)
{
+ incomingInit = true;
maxProcessed = commandsIn - 1;
syncPoint = maxProcessed;
}
@@ -242,6 +246,11 @@ public class Session extends SessionInvoker
final void identify(Method cmd)
{
+ if (!incomingInit)
+ {
+ throw new IllegalStateException();
+ }
+
int id = nextCommandId();
cmd.setId(id);
@@ -417,8 +426,8 @@ public class Session extends SessionInvoker
default:
throw new SessionException
(String.format
- ("timed out waiting for session to become open %s",
- state));
+ ("timed out waiting for session to become open " +
+ "(state=%s)", state));
}
int next = commandsOut++;
@@ -429,7 +438,26 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && isFull(next))
{
- sessionFlush(COMPLETED);
+ if (state == OPEN)
+ {
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will
+ // happen again on resume
+ log.error(e, "error sending flush (full replay buffer)");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
+ }
w.await();
}
}
@@ -452,7 +480,23 @@ public class Session extends SessionInvoker
m.setSync(true);
}
needSync = !m.isSync();
- send(m);
+ try
+ {
+ send(m);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will happen
+ // again on resume
+ log.error(e, "error sending command");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
if (autoSync)
{
sync();
@@ -462,7 +506,23 @@ public class Session extends SessionInvoker
// wraparound
if ((next % 65536) == 0)
{
- sessionFlush(COMPLETED);
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (expiry > 0)
+ {
+ // if expiry is > 0 then this will happen
+ // again on resume
+ log.error(e, "error sending flush (periodic)");
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 73ff039be5..36ea14856a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -92,7 +93,7 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
{
if (closed.get())
{
- throw new TransportException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
final int size = buffer.length;
@@ -125,12 +126,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
if (closed.get())
{
- throw new TransportException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
if (head - tail >= size)
{
- throw new TransportException(String.format("write timed out: %s, %s", head, tail));
+ throw new SenderException(String.format("write timed out: %s, %s", head, tail));
}
}
continue;
@@ -192,19 +193,19 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
join(timeout);
if (isAlive())
{
- throw new TransportException("join timed out");
+ throw new SenderException("join timed out");
}
}
transport.getReceiver().close(false);
}
catch (InterruptedException e)
{
- throw new TransportException(e);
+ throw new SenderException(e);
}
if (reportException && exception != null)
{
- throw new TransportException(exception);
+ throw new SenderException(exception);
}
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index b1fe08bfb9..1da56654f0 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -48,6 +48,7 @@ public class ConnectionTest extends TestCase implements SessionListener
private int port;
private volatile boolean queue = false;
private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
+ private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
protected void setUp() throws Exception
{
@@ -71,7 +72,7 @@ public class ConnectionTest extends TestCase implements SessionListener
public void opened(Session ssn) {}
- public void message(Session ssn, MessageTransfer xfr)
+ public void message(final Session ssn, MessageTransfer xfr)
{
if (queue)
{
@@ -86,6 +87,25 @@ public class ConnectionTest extends TestCase implements SessionListener
{
ssn.getConnection().close();
}
+ else if (body.startsWith("DELAYED_CLOSE"))
+ {
+ ssn.processed(xfr);
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ ssn.getConnection().close();
+ }
+ }.start();
+ }
else if (body.startsWith("ECHO"))
{
int id = xfr.getId();
@@ -139,7 +159,7 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
});
- conn.connect("localhost", port, null, "guest", "guest",false);
+ conn.connect("localhost", port, null, "guest", "guest", false);
return conn;
}
@@ -148,7 +168,7 @@ public class ConnectionTest extends TestCase implements SessionListener
Condition closed = new Condition();
Connection conn = connect(closed);
- Session ssn = conn.createSession();
+ Session ssn = conn.createSession(1);
send(ssn, "CLOSE");
if (!closed.get(3000))
@@ -167,44 +187,47 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
- public void testResume() throws Exception
+ class FailoverConnectionListener implements ConnectionListener
{
- Connection conn = new Connection();
- conn.connect("localhost", port, null, "guest", "guest",false);
+ public void opened(Connection conn) {}
- conn.setConnectionListener(new ConnectionListener()
+ public void exception(Connection conn, ConnectionException e)
{
- public void opened(Connection conn) {}
- public void exception(Connection conn, ConnectionException e)
- {
- throw e;
- }
- public void closed(Connection conn)
- {
- queue = true;
- conn.connect("localhost", port, null, "guest", "guest",false);
- conn.resume();
- }
- });
+ throw e;
+ }
- Session ssn = conn.createSession(1);
- final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
- ssn.setSessionListener(new SessionListener()
+ public void closed(Connection conn)
{
- public void opened(Session s) {}
- public void exception(Session s, SessionException e) {}
- public void message(Session s, MessageTransfer xfr)
- {
- synchronized (incoming)
- {
- incoming.add(xfr);
- incoming.notifyAll();
- }
+ queue = true;
+ conn.connect("localhost", port, null, "guest", "guest");
+ conn.resume();
+ }
+ }
- s.processed(xfr);
+ class TestSessionListener implements SessionListener
+ {
+ public void opened(Session s) {}
+ public void exception(Session s, SessionException e) {}
+ public void message(Session s, MessageTransfer xfr)
+ {
+ synchronized (incoming)
+ {
+ incoming.add(xfr);
+ incoming.notifyAll();
}
- public void closed(Session s) {}
- });
+
+ s.processed(xfr);
+ }
+ public void closed(Session s) {}
+ }
+
+ public void testResumeNonemptyReplayBuffer() throws Exception
+ {
+ Connection conn = new Connection();
+ conn.setConnectionListener(new FailoverConnectionListener());
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession(1);
+ ssn.setSessionListener(new TestSessionListener());
send(ssn, "SINK 0");
send(ssn, "ECHO 1");
@@ -251,4 +274,24 @@ public class ConnectionTest extends TestCase implements SessionListener
}
}
+ public void testResumeEmptyReplayBuffer() throws InterruptedException
+ {
+ Connection conn = new Connection();
+ conn.setConnectionListener(new FailoverConnectionListener());
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession(1);
+ ssn.setSessionListener(new TestSessionListener());
+
+ send(ssn, "SINK 0");
+ send(ssn, "SINK 1");
+ send(ssn, "DELAYED_CLOSE 2");
+ ssn.sync();
+ Thread.sleep(6000);
+ send(ssn, "SINK 3");
+ ssn.sync();
+ System.out.println(messages);
+ assertEquals(1, messages.size());
+ assertEquals("SINK 3", messages.get(0).getBodyString());
+ }
+
}