summaryrefslogtreecommitdiff
path: root/java/tools/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commit394823bba7976c170ac58e53b5d80ad12e0f1690 (patch)
tree9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/tools/src
parente78747f63bc73daa6e2035453358e6eaf3237b84 (diff)
downloadqpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java186
1 files changed, 79 insertions, 107 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
index 181cf427d1..82e05ba816 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -640,71 +640,47 @@ public class QpidBench
}
private static final org.apache.qpid.transport.Connection getConnection
- (Options opts, final SessionDelegate delegate)
+ (Options opts)
{
- final Object lock = new Object();
org.apache.qpid.transport.Connection conn =
- IoTransport.connect(opts.broker, opts.port,
- new ClientDelegate()
- {
- public SessionDelegate getSessionDelegate()
- {
- return delegate;
- }
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
- @Override public void connectionOpenOk(Channel ch,
- ConnectionOpenOk ok)
- {
- synchronized (lock)
- {
- lock.notify();
- }
- }
- });
- conn.send(new ProtocolHeader(1, 0, 10));
-
- synchronized (lock)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ new org.apache.qpid.transport.Connection();
+ conn.connect(opts.broker, opts.port, null, "guest", "guest");
+ return conn;
+ }
+
+ private static abstract class NativeListener implements SessionListener
+ {
+
+ public void opened(org.apache.qpid.transport.Session ssn) {}
+
+ public void exception(org.apache.qpid.transport.Session ssn,
+ SessionException exc)
+ {
+ exc.printStackTrace();
}
- return conn;
+ public void closed(org.apache.qpid.transport.Session ssn) {}
+
}
private static final void native_publisher(Options opts) throws Exception
{
final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(mt);
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ synchronized (echos)
+ {
+ echos[0]++;
+ echos.notify();
+ }
+ ssn.processed(xfr);
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -794,6 +770,7 @@ public class QpidBench
ssn.messageCancel("echo-queue");
ssn.sync();
+ ssn.close();
conn.close();
}
@@ -805,57 +782,51 @@ public class QpidBench
dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
final MessageProperties mp = new MessageProperties();
final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
-
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(mt);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ ssn.messageTransfer("amq.direct",
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp),
+ echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ ssn.processed(xfr);
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -879,6 +850,7 @@ public class QpidBench
ssn.messageCancel("test-queue");
ssn.sync();
+ ssn.close();
conn.close();
}