diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
commit | 394823bba7976c170ac58e53b5d80ad12e0f1690 (patch) | |
tree | 9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/client/example | |
parent | e78747f63bc73daa6e2035453358e6eaf3237b84 (diff) | |
download | qpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example')
8 files changed, 124 insertions, 339 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java index b7982f8e78..aa99112f32 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java @@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.direct; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; /** * This creates a queue a queue and binds it to the @@ -36,16 +35,8 @@ public class DeclareQueue public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -58,15 +49,7 @@ public class DeclareQueue session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java index ba4aec4024..1e571eeede 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -24,63 +24,21 @@ package org.apache.qpid.example.amqpexample.direct; import java.nio.ByteBuffer; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; -public class DirectProducer implements MessageListener +public class DirectProducer { - boolean finish = false; - - public void onMessage(Message m) - { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } - } - - public boolean isFinished() - { - return finish; - } public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -102,16 +60,8 @@ public class DirectProducer implements MessageListener session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java index 7fed35872d..370573c3eb 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.direct; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; - +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; /** * This listens to messages on a queue and terminates * when it sees the final message * */ -public class Listener implements MessageListener +public class Listener implements SessionListener { - boolean finish = false; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } + System.out.println("Message: " + xfr); } - public boolean isFinished() + public void exception(Session ssn, SessionException exc) { - return finish; + exc.printStackTrace(); } + public void closed(Session ssn) {} + /** * This sends 10 messages to the * amq.direct exchange using the * routing key as "routing_key" * */ - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener Listener listener = new Listener(); + session.setSessionListener(listener); // create a subscription session.messageSubscribe("message_queue", "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX - session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); - // check to see if we have received all the messages - while (!listener.isFinished()){} + // wait to receive all the messages + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); System.out.println("Shutting down listener for listener_destination"); session.messageCancel("listener_destination"); //cleanup - session.sessionDetach(session.getName()); - - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java index 75f274bcbb..079be003b1 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java @@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.fanout; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; /** * This creates a queue a queue and binds it to the @@ -36,16 +35,8 @@ public class DeclareQueue public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -58,15 +49,7 @@ public class DeclareQueue session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java index a1ec0b7f29..257bcdbfb1 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java @@ -21,13 +21,12 @@ package org.apache.qpid.example.amqpexample.fanout; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; public class FannoutProducer { @@ -38,16 +37,8 @@ public class FannoutProducer public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -68,16 +59,8 @@ public class FannoutProducer session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java index 432dd7eb4f..dead5569eb 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.fanout; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; - +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; /** * This listens to messages on a queue and terminates * when it sees the final message * */ -public class Listener implements MessageListener +public class Listener implements SessionListener { - boolean finish = false; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } + System.out.println("Message: " + xfr); } - public boolean isFinished() + public void exception(Session ssn, SessionException exc) { - return finish; + exc.printStackTrace(); } + public void closed(Session ssn) {} + /** * This sends 10 messages to the * amq.direct exchange using the * routing key as "routing_key" * */ - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener Listener listener = new Listener(); + session.setSessionListener(listener); // create a subscription session.messageSubscribe("message_queue", "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX - session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); // check to see if we have received all the messages - while (!listener.isFinished()){} + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); System.out.println("Shutting down listener for listener_destination"); session.messageCancel("listener_destination"); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index 70c08e0d02..2ed5b2d719 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -23,63 +23,49 @@ package org.apache.qpid.example.amqpexample.pubsub; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; -public class TopicListener implements MessageListener +public class TopicListener implements SessionListener { - boolean finish = false; - int count = 0; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey()); - - if (data != null && data.equals("That's all, folks!")) - { - count++; - if (count == 4){ - finish = true; - } - } + DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class); + System.out.println("Message: " + xfr + " with routing_key " + dp.getRoutingKey()); } + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + public void prepareQueue(Session session,String queueName,String bindingKey) { session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); session.exchangeBind(queueName, "amq.topic", bindingKey, null); session.exchangeBind(queueName, "amq.topic", "control", null); - session.messageSubscribe(queueName,queueName, - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(this), - null, Option.NONE); + session.messageSubscribe(queueName, queueName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX: need to be able to set to null - session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24); } @@ -88,30 +74,18 @@ public class TopicListener implements MessageListener session.messageCancel(dest); } - public boolean isFinished() - { - return finish; - } - - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener TopicListener listener = new TopicListener(); + session.setSessionListener(listener); listener.prepareQueue(session,"usa", "usa.#"); listener.prepareQueue(session,"europe", "europe.#"); @@ -120,25 +94,19 @@ public class TopicListener implements MessageListener // confirm completion session.sync(); - // check to see if we have received all the messages - while (!listener.isFinished()){} - System.out.println("Shutting down listener for listener_destination"); + + System.out.println("Waiting 100 seconds for messages"); + Thread.sleep(100*1000); + + System.out.println("Shutting down listeners"); listener.cancelSubscription(session,"usa"); listener.cancelSubscription(session,"europe"); listener.cancelSubscription(session,"news"); listener.cancelSubscription(session,"weather"); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java index 3067309555..20264d3791 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java @@ -21,16 +21,16 @@ package org.apache.qpid.example.amqpexample.pubsub; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; public class TopicPublisher { + public void publishMessages(Session session, String routing_key) { // Set the routing key once, we'll use the same routing key for all @@ -56,16 +56,8 @@ public class TopicPublisher public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -82,15 +74,7 @@ public class TopicPublisher session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } |