summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-23 01:21:22 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-23 01:21:22 +0000
commit673cb2f2eb5b8e9e6b8677fc4374ea49f34f194b (patch)
tree76b0127cf82c7e1b1b4b152af6f27d095ebd29d2 /java/common
parent8bcd43077274666703aa2a254cdbcaf8229148fa (diff)
downloadqpid-python-673cb2f2eb5b8e9e6b8677fc4374ea49f34f194b.tar.gz
QPID-1339: support for low level session resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Composite.tpl23
-rw-r--r--java/common/build.xml3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java36
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Echo.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java265
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Functions.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/Strings.java12
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java154
12 files changed, 385 insertions, 143 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl
index 450b1dea0e..eaaabdc041 100644
--- a/java/common/Composite.tpl
+++ b/java/common/Composite.tpl
@@ -13,6 +13,9 @@ import org.apache.qpid.transport.codec.Encoder;
import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.util.Strings;
+
+
${
from genutil import *
@@ -227,6 +230,26 @@ if segments:
setBody(body);
return this;
}
+
+ public final byte[] getBodyBytes() {
+ ByteBuffer buf = getBody();
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ return bytes;
+ }
+
+ public final void setBody(byte[] body)
+ {
+ setBody(ByteBuffer.wrap(body));
+ }
+
+ public final String getBodyString() {
+ return Strings.fromUTF8(getBodyBytes());
+ }
+
+ public final void setBody(String body) {
+ setBody(Strings.toUTF8(body));
+ }
""")
}
diff --git a/java/common/build.xml b/java/common/build.xml
index 24e1a22edc..6172a680ec 100644
--- a/java/common/build.xml
+++ b/java/common/build.xml
@@ -19,9 +19,6 @@
-
-->
<project name="AMQ Common" default="build">
-
- <!-- Disabled ConnectionTest due to QPID-1359 -->
- <property name="module.test.excludes" value="**/ConnectionTest.java"/>
<import file="../module.xml"/>
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 71027e3256..f4dc4408be 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -53,10 +53,10 @@ public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
- enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
-
private static final Logger log = Logger.get(Connection.class);
+ enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
class DefaultConnectionListener implements ConnectionListener
{
public void opened(Connection conn) {}
@@ -202,9 +202,9 @@ public class Connection extends ConnectionInvoker
return createSession(0);
}
- public Session createSession(long timeout)
+ public Session createSession(long expiry)
{
- return createSession(UUID.randomUUID().toString(), timeout);
+ return createSession(UUID.randomUUID().toString(), expiry);
}
public Session createSession(String name)
@@ -212,25 +212,24 @@ public class Connection extends ConnectionInvoker
return createSession(name, 0);
}
- public Session createSession(String name, long timeout)
+ public Session createSession(String name, long expiry)
{
- return createSession(Strings.toUTF8(name), timeout);
+ return createSession(Strings.toUTF8(name), expiry);
}
- public Session createSession(byte[] name, long timeout)
+ public Session createSession(byte[] name, long expiry)
{
- return createSession(new Binary(name), timeout);
+ return createSession(new Binary(name), expiry);
}
- public Session createSession(Binary name, long timeout)
+ public Session createSession(Binary name, long expiry)
{
synchronized (lock)
{
- Session ssn = new Session(this, name);
+ Session ssn = new Session(this, name, expiry);
sessions.put(name, ssn);
map(ssn);
- ssn.sessionAttach(name.getBytes());
- ssn.sessionRequestTimeout(timeout);
+ ssn.attach();
return ssn;
}
}
@@ -349,6 +348,19 @@ public class Connection extends ConnectionInvoker
}
}
+ public void resume()
+ {
+ synchronized (lock)
+ {
+ for (Session ssn : sessions.values())
+ {
+ map(ssn);
+ ssn.attach();
+ ssn.resume();
+ }
+ }
+ }
+
public void exception(ConnectionException e)
{
synchronized (lock)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index dcf05d9f72..c1031c9a1c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -39,8 +39,9 @@ public class Echo implements SessionListener
public void message(Session ssn, MessageTransfer xfr)
{
+ int id = xfr.getId();
ssn.invoke(xfr);
- ssn.processed(xfr);
+ ssn.processed(id);
}
public void exception(Session ssn, SessionException exc)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index be1ea54c93..e4b8ade285 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -140,7 +140,7 @@ public class ServerDelegate extends ConnectionDelegate
public Session getSession(Connection conn, SessionAttach atc)
{
- return new Session(conn, new Binary(atc.getName()));
+ return new Session(conn, new Binary(atc.getName()), 0);
}
@Override public void sessionAttach(Connection conn, SessionAttach atc)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 125f000543..e96aaf1b99 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -24,6 +24,7 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpid.transport.Option.*;
+import static org.apache.qpid.transport.Session.State.*;
import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.util.Serial.*;
import static org.apache.qpid.util.Strings.*;
@@ -49,6 +51,8 @@ public class Session extends SessionInvoker
private static final Logger log = Logger.get(Session.class);
+ enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+
class DefaultSessionListener implements SessionListener
{
@@ -69,49 +73,43 @@ public class Session extends SessionInvoker
public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
- private static boolean ENABLE_REPLAY = false;
-
- static
- {
- String enableReplay = "enable_command_replay";
- try
- {
- ENABLE_REPLAY = new Boolean(System.getProperties().getProperty(enableReplay, "false"));
- }
- catch (Exception e)
- {
- ENABLE_REPLAY = false;
- }
- }
-
private Connection connection;
private Binary name;
+ private long expiry;
private int channel;
private SessionDelegate delegate = new SessionDelegate();
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
+ private boolean incomingInit;
// incoming command count
- int commandsIn = 0;
+ private int commandsIn;
// completed incoming commands
private final Object processedLock = new Object();
- private RangeSet processed = new RangeSet();
- private int maxProcessed = commandsIn - 1;
- private int syncPoint = maxProcessed;
+ private RangeSet processed;
+ private int maxProcessed;
+ private int syncPoint;
// outgoing command count
private int commandsOut = 0;
- private Map<Integer,Method> commands = new HashMap<Integer,Method>();
+ private Method[] commands = new Method[64*1024];
private int maxComplete = commandsOut - 1;
private boolean needSync = false;
- private AtomicBoolean closed = new AtomicBoolean(false);
+ private State state = NEW;
- Session(Connection connection, Binary name)
+ Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
this.name = name;
+ this.expiry = expiry;
+ initReceiver();
+ }
+
+ public Connection getConnection()
+ {
+ return connection;
}
public Binary getName()
@@ -119,6 +117,11 @@ public class Session extends SessionInvoker
return name;
}
+ void setExpiry(long expiry)
+ {
+ this.expiry = expiry;
+ }
+
int getChannel()
{
return channel;
@@ -154,9 +157,63 @@ public class Session extends SessionInvoker
}
}
- public Map<Integer,Method> getOutstandingCommands()
+ private void initReceiver()
+ {
+ synchronized (processedLock)
+ {
+ incomingInit = false;
+ processed = new RangeSet();
+ }
+ }
+
+ void attach()
{
- return commands;
+ initReceiver();
+ sessionAttach(name.getBytes());
+ sessionRequestTimeout(expiry);
+ }
+
+ void resume()
+ {
+ synchronized (commands)
+ {
+ for (int i = maxComplete + 1; lt(i, commandsOut); i++)
+ {
+ Method m = commands[mod(i, commands.length)];
+ if (m != null)
+ {
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
+ }
+ }
+ }
+ }
+
+ void dump()
+ {
+ synchronized (commands)
+ {
+ for (Method m : commands)
+ {
+ if (m != null)
+ {
+ System.out.println(m);
+ }
+ }
+ }
+ }
+
+ final void commandPoint(int id)
+ {
+ synchronized (processedLock)
+ {
+ this.commandsIn = id;
+ if (!incomingInit)
+ {
+ maxProcessed = commandsIn - 1;
+ syncPoint = maxProcessed;
+ }
+ }
}
public int getCommandsOut()
@@ -209,11 +266,12 @@ public class Session extends SessionInvoker
public void processed(Range range)
{
- log.debug("%s processed(%s)", this, range);
+ log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
boolean flush;
synchronized (processedLock)
{
+ log.debug("%s", processed);
processed.add(range);
Range first = processed.getFirst();
int lower = first.getLower();
@@ -281,14 +339,6 @@ public class Session extends SessionInvoker
}
}
- public Method getCommand(int id)
- {
- synchronized (commands)
- {
- return commands.get(id);
- }
- }
-
boolean complete(int lower, int upper)
{
//avoid autoboxing
@@ -301,13 +351,13 @@ public class Session extends SessionInvoker
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
- commands.remove(id);
+ commands[mod(id, commands.length)] = null;
}
if (le(lower, maxComplete + 1))
{
maxComplete = max(maxComplete, upper);
}
- log.debug("%s commands remaining: %s", this, commands);
+ log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
commands.notifyAll();
return gt(maxComplete, old);
}
@@ -329,38 +379,47 @@ public class Session extends SessionInvoker
}
}
- public void invoke(Method m)
+ final private boolean isFull(int id)
{
- if (closed.get())
- {
- ExecutionException exc = getException();
- if (exc != null)
- {
- throw new SessionException(exc);
- }
- else if (close != null)
- {
- throw new ConnectionException(close);
- }
- else
- {
- throw new SessionClosedException();
- }
- }
+ return id - maxComplete >= commands.length;
+ }
+ public void invoke(Method m)
+ {
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commands)
{
+ if (state == CLOSED)
+ {
+ throw new SessionClosedException();
+ }
+
int next = commandsOut++;
m.setId(next);
+
+ if (isFull(next))
+ {
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && isFull(next))
+ {
+ sessionFlush(COMPLETED);
+ w.await();
+ }
+ }
+
+ if (isFull(next))
+ {
+ throw new SessionException("timed out waiting for completion");
+ }
+
if (next == 0)
{
sessionCommandPoint(0, 0);
}
- if (ENABLE_REPLAY)
+ if (expiry > 0)
{
- commands.put(next, m);
+ commands[mod(next, commands.length)] = m;
}
if (autoSync)
{
@@ -404,31 +463,23 @@ public class Session extends SessionInvoker
executionSync(SYNC);
}
- long start = System.currentTimeMillis();
- long elapsed = 0;
- while (!closed.get() && elapsed < timeout && lt(maxComplete, point))
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- try {
- log.debug("%s waiting for[%d]: %d, %s", this, point,
- maxComplete, commands);
- commands.wait(timeout - elapsed);
- elapsed = System.currentTimeMillis() - start;
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ log.debug("%s waiting for[%d]: %d, %s", this, point,
+ maxComplete, commands);
+ w.await();
}
if (lt(maxComplete, point))
{
- if (closed.get())
+ if (state == CLOSED)
{
throw new SessionException(getException());
}
else
{
- throw new RuntimeException
+ throw new SessionException
(String.format
("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
}
@@ -518,20 +569,11 @@ public class Session extends SessionInvoker
{
synchronized (this)
{
- long start = System.currentTimeMillis();
- long elapsed = 0;
- while (!closed.get() && timeout - elapsed > 0 && !isDone())
+ Waiter w = new Waiter(this, timeout);
+ while (w.hasTime() && state != CLOSED && !isDone())
{
- try
- {
- log.debug("%s waiting for result: %s", Session.this, this);
- wait(timeout - elapsed);
- elapsed = System.currentTimeMillis() - start;
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ log.debug("%s waiting for result: %s", Session.this, this);
+ w.await();
}
}
@@ -539,13 +581,15 @@ public class Session extends SessionInvoker
{
return result;
}
- else if (closed.get())
+ else if (state == CLOSED)
{
throw new SessionException(getException());
}
else
{
- return null;
+ throw new SessionException
+ (String.format("%s timed out waiting for result: %s",
+ Session.this, this));
}
}
@@ -588,32 +632,24 @@ public class Session extends SessionInvoker
public void close()
{
- sessionRequestTimeout(0);
- sessionDetach(name.getBytes());
synchronized (commands)
{
- long start = System.currentTimeMillis();
- long elapsed = 0;
- try
+ state = CLOSING;
+ sessionRequestTimeout(0);
+ sessionDetach(name.getBytes());
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && state != CLOSED)
{
- while (!closed.get() && elapsed < timeout)
- {
- commands.wait(timeout - elapsed);
- elapsed = System.currentTimeMillis() - start;
- }
-
- if (!closed.get())
- {
- throw new SessionException("close() timed out");
- }
+ w.await();
}
- catch (InterruptedException e)
+
+ if (state != CLOSED)
{
- throw new RuntimeException(e);
+ throw new SessionException("close() timed out");
}
- }
- connection.removeSession(this);
+ connection.removeSession(this);
+ }
}
public void exception(Throwable t)
@@ -623,18 +659,27 @@ public class Session extends SessionInvoker
public void closed()
{
- closed.set(true);
synchronized (commands)
{
+ if (expiry == 0)
+ {
+ state = CLOSED;
+ }
+ else
+ {
+ state = DETACHED;
+ }
+
commands.notifyAll();
- }
- synchronized (results)
- {
- for (ResultFuture<?> result : results.values())
+
+ synchronized (results)
{
- synchronized(result)
+ for (ResultFuture<?> result : results.values())
{
- result.notifyAll();
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index f6a1735b68..e2b6980dd4 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -57,7 +57,10 @@ public class SessionDelegate
log.warn("UNHANDLED: [%s] %s", ssn, method);
}
- @Override public void sessionTimeout(Session ssn, SessionTimeout t) {}
+ @Override public void sessionTimeout(Session ssn, SessionTimeout t)
+ {
+ ssn.setExpiry(t.getTimeout());
+ }
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
{
@@ -113,7 +116,7 @@ public class SessionDelegate
@Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
{
- ssn.commandsIn = scp.getCommandId();
+ ssn.commandPoint(scp.getCommandId());
}
@Override public void executionSync(Session ssn, ExecutionSync sync)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index bb7d2506e3..7908700cbe 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -208,11 +208,14 @@ public final class Disassembler implements Sender<ProtocolEvent>,
if (payload)
{
final Header hdr = method.getHeader();
- final Struct[] structs = hdr.getStructs();
-
- for (Struct st : structs)
+ if (hdr != null)
{
- enc.writeStruct32(st);
+ final Struct[] structs = hdr.getStructs();
+
+ for (Struct st : structs)
+ {
+ enc.writeStruct32(st);
+ }
}
headerSeg = enc.segment();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index f25b16d71a..73ff039be5 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -88,12 +88,6 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
return result;
}
- private static final int mod(int n, int m)
- {
- int r = n % m;
- return r < 0 ? m + r : r;
- }
-
public void send(ByteBuffer buf)
{
if (closed.get())
diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
index 2c6984e302..c220694b50 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -34,6 +34,12 @@ import static java.lang.Math.*;
public class Functions
{
+ public static final int mod(int n, int m)
+ {
+ int r = n % m;
+ return r < 0 ? m + r : r;
+ }
+
public static final byte lsb(int i)
{
return (byte) (0xFF & i);
diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java
index 4b199bafe6..a0bbbb22de 100644
--- a/java/common/src/main/java/org/apache/qpid/util/Strings.java
+++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -79,4 +79,16 @@ public final class Strings
}
}
+ public static final String fromUTF8(byte[] bytes)
+ {
+ try
+ {
+ return new String(bytes, "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 03fae56250..19e1c2537f 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -28,21 +28,26 @@ import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.io.IoAcceptor;
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import junit.framework.TestCase;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
/**
* ConnectionTest
*/
-public class ConnectionTest extends TestCase
+public class ConnectionTest extends TestCase implements SessionListener
{
private static final Logger log = Logger.get(ConnectionTest.class);
private int port;
+ private volatile boolean queue = false;
+ private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
protected void setUp() throws Exception
{
@@ -51,10 +56,11 @@ public class ConnectionTest extends TestCase
port = AvailablePortFinder.getNextAvailable(12000);
ConnectionDelegate server = new ServerDelegate() {
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- super.connectionOpen(conn, open);
- conn.close();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(ConnectionTest.this);
+ return ssn;
}
};
@@ -63,6 +69,58 @@ public class ConnectionTest extends TestCase
ioa.start();
}
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ if (queue)
+ {
+ messages.add(xfr);
+ ssn.processed(xfr);
+ return;
+ }
+
+ String body = xfr.getBodyString();
+
+ if (body.startsWith("CLOSE"))
+ {
+ ssn.getConnection().close();
+ }
+ else if (body.startsWith("ECHO"))
+ {
+ int id = xfr.getId();
+ ssn.invoke(xfr);
+ ssn.processed(id);
+ }
+ else if (body.startsWith("SINK"))
+ {
+ ssn.processed(xfr);
+ }
+ else if (body.startsWith("DROP"))
+ {
+ // do nothing
+ }
+ else
+ {
+ throw new IllegalArgumentException
+ ("unrecognized message: " + body);
+ }
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ throw exc;
+ }
+
+ public void closed(Session ssn) {}
+
+ private void send(Session ssn, String msg)
+ {
+ ssn.messageTransfer
+ ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, msg);
+ }
+
private Connection connect(final Condition closed)
{
Connection conn = new Connection();
@@ -89,6 +147,10 @@ public class ConnectionTest extends TestCase
{
Condition closed = new Condition();
Connection conn = connect(closed);
+
+ Session ssn = conn.createSession();
+ send(ssn, "CLOSE");
+
if (!closed.get(3000))
{
fail("never got notified of connection close");
@@ -105,4 +167,88 @@ public class ConnectionTest extends TestCase
}
}
+ public void testResume() throws Exception
+ {
+ Connection conn = new Connection();
+ conn.connect("localhost", port, null, "guest", "guest");
+
+ conn.setConnectionListener(new ConnectionListener()
+ {
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException e)
+ {
+ throw e;
+ }
+ public void closed(Connection conn)
+ {
+ queue = true;
+ conn.connect("localhost", port, null, "guest", "guest");
+ conn.resume();
+ }
+ });
+
+ Session ssn = conn.createSession(1);
+ final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
+ ssn.setSessionListener(new SessionListener()
+ {
+ public void opened(Session s) {}
+ public void exception(Session s, SessionException e) {}
+ public void message(Session s, MessageTransfer xfr)
+ {
+ synchronized (incoming)
+ {
+ incoming.add(xfr);
+ incoming.notifyAll();
+ }
+
+ s.processed(xfr);
+ }
+ public void closed(Session s) {}
+ });
+
+ send(ssn, "SINK 0");
+ send(ssn, "ECHO 1");
+ send(ssn, "ECHO 2");
+
+ ssn.sync();
+
+ String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" };
+ for (String m : msgs)
+ {
+ send(ssn, m);
+ }
+
+ ssn.sync();
+
+ assertEquals(msgs.length, messages.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertEquals(msgs[i], messages.get(i).getBodyString());
+ }
+
+ queue = false;
+
+ send(ssn, "ECHO 8");
+ send(ssn, "ECHO 9");
+
+ synchronized (incoming)
+ {
+ Waiter w = new Waiter(incoming, 30000);
+ while (w.hasTime() && incoming.size() < 4)
+ {
+ w.await();
+ }
+
+ assertEquals(4, incoming.size());
+ assertEquals("ECHO 1", incoming.get(0).getBodyString());
+ assertEquals(0, incoming.get(0).getId());
+ assertEquals("ECHO 2", incoming.get(1).getBodyString());
+ assertEquals(1, incoming.get(1).getId());
+ assertEquals("ECHO 8", incoming.get(2).getBodyString());
+ assertEquals(0, incoming.get(0).getId());
+ assertEquals("ECHO 9", incoming.get(3).getBodyString());
+ assertEquals(1, incoming.get(1).getId());
+ }
+ }
+
}