diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
commit | 673cb2f2eb5b8e9e6b8677fc4374ea49f34f194b (patch) | |
tree | 76b0127cf82c7e1b1b4b152af6f27d095ebd29d2 /java/common | |
parent | 8bcd43077274666703aa2a254cdbcaf8229148fa (diff) | |
download | qpid-python-673cb2f2eb5b8e9e6b8677fc4374ea49f34f194b.tar.gz |
QPID-1339: support for low level session resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
12 files changed, 385 insertions, 143 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl index 450b1dea0e..eaaabdc041 100644 --- a/java/common/Composite.tpl +++ b/java/common/Composite.tpl @@ -13,6 +13,9 @@ import org.apache.qpid.transport.codec.Encoder; import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.util.Strings; + + ${ from genutil import * @@ -227,6 +230,26 @@ if segments: setBody(body); return this; } + + public final byte[] getBodyBytes() { + ByteBuffer buf = getBody(); + byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return bytes; + } + + public final void setBody(byte[] body) + { + setBody(ByteBuffer.wrap(body)); + } + + public final String getBodyString() { + return Strings.fromUTF8(getBodyBytes()); + } + + public final void setBody(String body) { + setBody(Strings.toUTF8(body)); + } """) } diff --git a/java/common/build.xml b/java/common/build.xml index 24e1a22edc..6172a680ec 100644 --- a/java/common/build.xml +++ b/java/common/build.xml @@ -19,9 +19,6 @@ - --> <project name="AMQ Common" default="build"> - - <!-- Disabled ConnectionTest due to QPID-1359 --> - <property name="module.test.excludes" value="**/ConnectionTest.java"/> <import file="../module.xml"/> diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 71027e3256..f4dc4408be 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -53,10 +53,10 @@ public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { - enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } - private static final Logger log = Logger.get(Connection.class); + enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + class DefaultConnectionListener implements ConnectionListener { public void opened(Connection conn) {} @@ -202,9 +202,9 @@ public class Connection extends ConnectionInvoker return createSession(0); } - public Session createSession(long timeout) + public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), timeout); + return createSession(UUID.randomUUID().toString(), expiry); } public Session createSession(String name) @@ -212,25 +212,24 @@ public class Connection extends ConnectionInvoker return createSession(name, 0); } - public Session createSession(String name, long timeout) + public Session createSession(String name, long expiry) { - return createSession(Strings.toUTF8(name), timeout); + return createSession(Strings.toUTF8(name), expiry); } - public Session createSession(byte[] name, long timeout) + public Session createSession(byte[] name, long expiry) { - return createSession(new Binary(name), timeout); + return createSession(new Binary(name), expiry); } - public Session createSession(Binary name, long timeout) + public Session createSession(Binary name, long expiry) { synchronized (lock) { - Session ssn = new Session(this, name); + Session ssn = new Session(this, name, expiry); sessions.put(name, ssn); map(ssn); - ssn.sessionAttach(name.getBytes()); - ssn.sessionRequestTimeout(timeout); + ssn.attach(); return ssn; } } @@ -349,6 +348,19 @@ public class Connection extends ConnectionInvoker } } + public void resume() + { + synchronized (lock) + { + for (Session ssn : sessions.values()) + { + map(ssn); + ssn.attach(); + ssn.resume(); + } + } + } + public void exception(ConnectionException e) { synchronized (lock) diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index dcf05d9f72..c1031c9a1c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -39,8 +39,9 @@ public class Echo implements SessionListener public void message(Session ssn, MessageTransfer xfr) { + int id = xfr.getId(); ssn.invoke(xfr); - ssn.processed(xfr); + ssn.processed(id); } public void exception(Session ssn, SessionException exc) diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index be1ea54c93..e4b8ade285 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -140,7 +140,7 @@ public class ServerDelegate extends ConnectionDelegate public Session getSession(Connection conn, SessionAttach atc) { - return new Session(conn, new Binary(atc.getName())); + return new Session(conn, new Binary(atc.getName()), 0); } @Override public void sessionAttach(Connection conn, SessionAttach atc) diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 125f000543..e96aaf1b99 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -24,6 +24,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.transport.Option.*; +import static org.apache.qpid.transport.Session.State.*; import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.util.Serial.*; import static org.apache.qpid.util.Strings.*; @@ -49,6 +51,8 @@ public class Session extends SessionInvoker private static final Logger log = Logger.get(Session.class); + enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED } + class DefaultSessionListener implements SessionListener { @@ -69,49 +73,43 @@ public class Session extends SessionInvoker public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; - private static boolean ENABLE_REPLAY = false; - - static - { - String enableReplay = "enable_command_replay"; - try - { - ENABLE_REPLAY = new Boolean(System.getProperties().getProperty(enableReplay, "false")); - } - catch (Exception e) - { - ENABLE_REPLAY = false; - } - } - private Connection connection; private Binary name; + private long expiry; private int channel; private SessionDelegate delegate = new SessionDelegate(); private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; + private boolean incomingInit; // incoming command count - int commandsIn = 0; + private int commandsIn; // completed incoming commands private final Object processedLock = new Object(); - private RangeSet processed = new RangeSet(); - private int maxProcessed = commandsIn - 1; - private int syncPoint = maxProcessed; + private RangeSet processed; + private int maxProcessed; + private int syncPoint; // outgoing command count private int commandsOut = 0; - private Map<Integer,Method> commands = new HashMap<Integer,Method>(); + private Method[] commands = new Method[64*1024]; private int maxComplete = commandsOut - 1; private boolean needSync = false; - private AtomicBoolean closed = new AtomicBoolean(false); + private State state = NEW; - Session(Connection connection, Binary name) + Session(Connection connection, Binary name, long expiry) { this.connection = connection; this.name = name; + this.expiry = expiry; + initReceiver(); + } + + public Connection getConnection() + { + return connection; } public Binary getName() @@ -119,6 +117,11 @@ public class Session extends SessionInvoker return name; } + void setExpiry(long expiry) + { + this.expiry = expiry; + } + int getChannel() { return channel; @@ -154,9 +157,63 @@ public class Session extends SessionInvoker } } - public Map<Integer,Method> getOutstandingCommands() + private void initReceiver() + { + synchronized (processedLock) + { + incomingInit = false; + processed = new RangeSet(); + } + } + + void attach() { - return commands; + initReceiver(); + sessionAttach(name.getBytes()); + sessionRequestTimeout(expiry); + } + + void resume() + { + synchronized (commands) + { + for (int i = maxComplete + 1; lt(i, commandsOut); i++) + { + Method m = commands[mod(i, commands.length)]; + if (m != null) + { + sessionCommandPoint(m.getId(), 0); + send(m); + } + } + } + } + + void dump() + { + synchronized (commands) + { + for (Method m : commands) + { + if (m != null) + { + System.out.println(m); + } + } + } + } + + final void commandPoint(int id) + { + synchronized (processedLock) + { + this.commandsIn = id; + if (!incomingInit) + { + maxProcessed = commandsIn - 1; + syncPoint = maxProcessed; + } + } } public int getCommandsOut() @@ -209,11 +266,12 @@ public class Session extends SessionInvoker public void processed(Range range) { - log.debug("%s processed(%s)", this, range); + log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed); boolean flush; synchronized (processedLock) { + log.debug("%s", processed); processed.add(range); Range first = processed.getFirst(); int lower = first.getLower(); @@ -281,14 +339,6 @@ public class Session extends SessionInvoker } } - public Method getCommand(int id) - { - synchronized (commands) - { - return commands.get(id); - } - } - boolean complete(int lower, int upper) { //avoid autoboxing @@ -301,13 +351,13 @@ public class Session extends SessionInvoker int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands.remove(id); + commands[mod(id, commands.length)] = null; } if (le(lower, maxComplete + 1)) { maxComplete = max(maxComplete, upper); } - log.debug("%s commands remaining: %s", this, commands); + log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); commands.notifyAll(); return gt(maxComplete, old); } @@ -329,38 +379,47 @@ public class Session extends SessionInvoker } } - public void invoke(Method m) + final private boolean isFull(int id) { - if (closed.get()) - { - ExecutionException exc = getException(); - if (exc != null) - { - throw new SessionException(exc); - } - else if (close != null) - { - throw new ConnectionException(close); - } - else - { - throw new SessionClosedException(); - } - } + return id - maxComplete >= commands.length; + } + public void invoke(Method m) + { if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) { + if (state == CLOSED) + { + throw new SessionClosedException(); + } + int next = commandsOut++; m.setId(next); + + if (isFull(next)) + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && isFull(next)) + { + sessionFlush(COMPLETED); + w.await(); + } + } + + if (isFull(next)) + { + throw new SessionException("timed out waiting for completion"); + } + if (next == 0) { sessionCommandPoint(0, 0); } - if (ENABLE_REPLAY) + if (expiry > 0) { - commands.put(next, m); + commands[mod(next, commands.length)] = m; } if (autoSync) { @@ -404,31 +463,23 @@ public class Session extends SessionInvoker executionSync(SYNC); } - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed.get() && elapsed < timeout && lt(maxComplete, point)) + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - try { - log.debug("%s waiting for[%d]: %d, %s", this, point, - maxComplete, commands); - commands.wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + log.debug("%s waiting for[%d]: %d, %s", this, point, + maxComplete, commands); + w.await(); } if (lt(maxComplete, point)) { - if (closed.get()) + if (state == CLOSED) { throw new SessionException(getException()); } else { - throw new RuntimeException + throw new SessionException (String.format ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); } @@ -518,20 +569,11 @@ public class Session extends SessionInvoker { synchronized (this) { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed.get() && timeout - elapsed > 0 && !isDone()) + Waiter w = new Waiter(this, timeout); + while (w.hasTime() && state != CLOSED && !isDone()) { - try - { - log.debug("%s waiting for result: %s", Session.this, this); - wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + log.debug("%s waiting for result: %s", Session.this, this); + w.await(); } } @@ -539,13 +581,15 @@ public class Session extends SessionInvoker { return result; } - else if (closed.get()) + else if (state == CLOSED) { throw new SessionException(getException()); } else { - return null; + throw new SessionException + (String.format("%s timed out waiting for result: %s", + Session.this, this)); } } @@ -588,32 +632,24 @@ public class Session extends SessionInvoker public void close() { - sessionRequestTimeout(0); - sessionDetach(name.getBytes()); synchronized (commands) { - long start = System.currentTimeMillis(); - long elapsed = 0; - try + state = CLOSING; + sessionRequestTimeout(0); + sessionDetach(name.getBytes()); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED) { - while (!closed.get() && elapsed < timeout) - { - commands.wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - - if (!closed.get()) - { - throw new SessionException("close() timed out"); - } + w.await(); } - catch (InterruptedException e) + + if (state != CLOSED) { - throw new RuntimeException(e); + throw new SessionException("close() timed out"); } - } - connection.removeSession(this); + connection.removeSession(this); + } } public void exception(Throwable t) @@ -623,18 +659,27 @@ public class Session extends SessionInvoker public void closed() { - closed.set(true); synchronized (commands) { + if (expiry == 0) + { + state = CLOSED; + } + else + { + state = DETACHED; + } + commands.notifyAll(); - } - synchronized (results) - { - for (ResultFuture<?> result : results.values()) + + synchronized (results) { - synchronized(result) + for (ResultFuture<?> result : results.values()) { - result.notifyAll(); + synchronized(result) + { + result.notifyAll(); + } } } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index f6a1735b68..e2b6980dd4 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -57,7 +57,10 @@ public class SessionDelegate log.warn("UNHANDLED: [%s] %s", ssn, method); } - @Override public void sessionTimeout(Session ssn, SessionTimeout t) {} + @Override public void sessionTimeout(Session ssn, SessionTimeout t) + { + ssn.setExpiry(t.getTimeout()); + } @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { @@ -113,7 +116,7 @@ public class SessionDelegate @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.commandsIn = scp.getCommandId(); + ssn.commandPoint(scp.getCommandId()); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index bb7d2506e3..7908700cbe 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -208,11 +208,14 @@ public final class Disassembler implements Sender<ProtocolEvent>, if (payload) { final Header hdr = method.getHeader(); - final Struct[] structs = hdr.getStructs(); - - for (Struct st : structs) + if (hdr != null) { - enc.writeStruct32(st); + final Struct[] structs = hdr.getStructs(); + + for (Struct st : structs) + { + enc.writeStruct32(st); + } } headerSeg = enc.segment(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index f25b16d71a..73ff039be5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -88,12 +88,6 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> return result; } - private static final int mod(int n, int m) - { - int r = n % m; - return r < 0 ? m + r : r; - } - public void send(ByteBuffer buf) { if (closed.get()) diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index 2c6984e302..c220694b50 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -34,6 +34,12 @@ import static java.lang.Math.*; public class Functions { + public static final int mod(int n, int m) + { + int r = n % m; + return r < 0 ? m + r : r; + } + public static final byte lsb(int i) { return (byte) (0xFF & i); diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java index 4b199bafe6..a0bbbb22de 100644 --- a/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -79,4 +79,16 @@ public final class Strings } } + public static final String fromUTF8(byte[] bytes) + { + try + { + return new String(bytes, "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + } diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 03fae56250..19e1c2537f 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -28,21 +28,26 @@ import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoAcceptor; import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import junit.framework.TestCase; +import java.util.ArrayList; +import java.util.List; import java.util.Random; /** * ConnectionTest */ -public class ConnectionTest extends TestCase +public class ConnectionTest extends TestCase implements SessionListener { private static final Logger log = Logger.get(ConnectionTest.class); private int port; + private volatile boolean queue = false; + private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); protected void setUp() throws Exception { @@ -51,10 +56,11 @@ public class ConnectionTest extends TestCase port = AvailablePortFinder.getNextAvailable(12000); ConnectionDelegate server = new ServerDelegate() { - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override public Session getSession(Connection conn, SessionAttach atc) { - super.connectionOpen(conn, open); - conn.close(); + Session ssn = super.getSession(conn, atc); + ssn.setSessionListener(ConnectionTest.this); + return ssn; } }; @@ -63,6 +69,58 @@ public class ConnectionTest extends TestCase ioa.start(); } + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + if (queue) + { + messages.add(xfr); + ssn.processed(xfr); + return; + } + + String body = xfr.getBodyString(); + + if (body.startsWith("CLOSE")) + { + ssn.getConnection().close(); + } + else if (body.startsWith("ECHO")) + { + int id = xfr.getId(); + ssn.invoke(xfr); + ssn.processed(id); + } + else if (body.startsWith("SINK")) + { + ssn.processed(xfr); + } + else if (body.startsWith("DROP")) + { + // do nothing + } + else + { + throw new IllegalArgumentException + ("unrecognized message: " + body); + } + } + + public void exception(Session ssn, SessionException exc) + { + throw exc; + } + + public void closed(Session ssn) {} + + private void send(Session ssn, String msg) + { + ssn.messageTransfer + ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, msg); + } + private Connection connect(final Condition closed) { Connection conn = new Connection(); @@ -89,6 +147,10 @@ public class ConnectionTest extends TestCase { Condition closed = new Condition(); Connection conn = connect(closed); + + Session ssn = conn.createSession(); + send(ssn, "CLOSE"); + if (!closed.get(3000)) { fail("never got notified of connection close"); @@ -105,4 +167,88 @@ public class ConnectionTest extends TestCase } } + public void testResume() throws Exception + { + Connection conn = new Connection(); + conn.connect("localhost", port, null, "guest", "guest"); + + conn.setConnectionListener(new ConnectionListener() + { + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException e) + { + throw e; + } + public void closed(Connection conn) + { + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + }); + + Session ssn = conn.createSession(1); + final List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); + ssn.setSessionListener(new SessionListener() + { + public void opened(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); + } + + s.processed(xfr); + } + public void closed(Session s) {} + }); + + send(ssn, "SINK 0"); + send(ssn, "ECHO 1"); + send(ssn, "ECHO 2"); + + ssn.sync(); + + String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" }; + for (String m : msgs) + { + send(ssn, m); + } + + ssn.sync(); + + assertEquals(msgs.length, messages.size()); + for (int i = 0; i < msgs.length; i++) + { + assertEquals(msgs[i], messages.get(i).getBodyString()); + } + + queue = false; + + send(ssn, "ECHO 8"); + send(ssn, "ECHO 9"); + + synchronized (incoming) + { + Waiter w = new Waiter(incoming, 30000); + while (w.hasTime() && incoming.size() < 4) + { + w.await(); + } + + assertEquals(4, incoming.size()); + assertEquals("ECHO 1", incoming.get(0).getBodyString()); + assertEquals(0, incoming.get(0).getId()); + assertEquals("ECHO 2", incoming.get(1).getBodyString()); + assertEquals(1, incoming.get(1).getId()); + assertEquals("ECHO 8", incoming.get(2).getBodyString()); + assertEquals(0, incoming.get(0).getId()); + assertEquals("ECHO 9", incoming.get(3).getBodyString()); + assertEquals(1, incoming.get(1).getId()); + } + } + } |