diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
commit | 394823bba7976c170ac58e53b5d80ad12e0f1690 (patch) | |
tree | 9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/tools/src | |
parent | e78747f63bc73daa6e2035453358e6eaf3237b84 (diff) | |
download | qpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 186 |
1 files changed, 79 insertions, 107 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 181cf427d1..82e05ba816 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -640,71 +640,47 @@ public class QpidBench } private static final org.apache.qpid.transport.Connection getConnection - (Options opts, final SessionDelegate delegate) + (Options opts) { - final Object lock = new Object(); org.apache.qpid.transport.Connection conn = - IoTransport.connect(opts.broker, opts.port, - new ClientDelegate() - { - public SessionDelegate getSessionDelegate() - { - return delegate; - } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} - @Override public void connectionOpenOk(Channel ch, - ConnectionOpenOk ok) - { - synchronized (lock) - { - lock.notify(); - } - } - }); - conn.send(new ProtocolHeader(1, 0, 10)); - - synchronized (lock) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest"); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); } - return conn; + public void closed(org.apache.qpid.transport.Session ssn) {} + } private static final void native_publisher(Options opts) throws Exception { final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(mt); - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -794,6 +770,7 @@ public class QpidBench ssn.messageCancel("echo-queue"); ssn.sync(); + ssn.close(); conn.close(); } @@ -805,57 +782,51 @@ public class QpidBench dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); final MessageProperties mp = new MessageProperties(); final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - - private long count = 0; - private long lastTime = 0; - private long start; - - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(mt); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -879,6 +850,7 @@ public class QpidBench ssn.messageCancel("test-queue"); ssn.sync(); + ssn.close(); conn.close(); } |