summaryrefslogtreecommitdiff
path: root/java/client/example
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commit394823bba7976c170ac58e53b5d80ad12e0f1690 (patch)
tree9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/client/example
parente78747f63bc73daa6e2035453358e6eaf3237b84 (diff)
downloadqpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example')
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java64
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java91
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java85
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java106
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java30
8 files changed, 124 insertions, 339 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
index b7982f8e78..aa99112f32 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
@@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.direct;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
/**
* This creates a queue a queue and binds it to the
@@ -36,16 +35,8 @@ public class DeclareQueue
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -58,15 +49,7 @@ public class DeclareQueue
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
index ba4aec4024..1e571eeede 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
@@ -24,63 +24,21 @@ package org.apache.qpid.example.amqpexample.direct;
import java.nio.ByteBuffer;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
-public class DirectProducer implements MessageListener
+public class DirectProducer
{
- boolean finish = false;
-
- public void onMessage(Message m)
- {
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
- }
-
- public boolean isFinished()
- {
- return finish;
- }
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -102,16 +60,8 @@ public class DirectProducer implements MessageListener
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
index 7fed35872d..370573c3eb 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
@@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.direct;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
/**
* This listens to messages on a queue and terminates
* when it sees the final message
*
*/
-public class Listener implements MessageListener
+public class Listener implements SessionListener
{
- boolean finish = false;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
+ System.out.println("Message: " + xfr);
}
- public boolean isFinished()
+ public void exception(Session ssn, SessionException exc)
{
- return finish;
+ exc.printStackTrace();
}
+ public void closed(Session ssn) {}
+
/**
* This sends 10 messages to the
* amq.direct exchange using the
* routing key as "routing_key"
*
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
Listener listener = new Listener();
+ session.setSessionListener(listener);
// create a subscription
session.messageSubscribe("message_queue",
"listener_destination",
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX
- session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11);
// confirm completion
session.sync();
- // check to see if we have received all the messages
- while (!listener.isFinished()){}
+ // wait to receive all the messages
+ System.out.println("Waiting 100 seconds for messages from listener_destination");
+ Thread.sleep(100*1000);
System.out.println("Shutting down listener for listener_destination");
session.messageCancel("listener_destination");
//cleanup
- session.sessionDetach(session.getName());
-
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
index 75f274bcbb..079be003b1 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
@@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.fanout;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
/**
* This creates a queue a queue and binds it to the
@@ -36,16 +35,8 @@ public class DeclareQueue
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -58,15 +49,7 @@ public class DeclareQueue
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
index a1ec0b7f29..257bcdbfb1 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
@@ -21,13 +21,12 @@ package org.apache.qpid.example.amqpexample.fanout;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
public class FannoutProducer
{
@@ -38,16 +37,8 @@ public class FannoutProducer
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -68,16 +59,8 @@ public class FannoutProducer
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
index 432dd7eb4f..dead5569eb 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
@@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.fanout;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
/**
* This listens to messages on a queue and terminates
* when it sees the final message
*
*/
-public class Listener implements MessageListener
+public class Listener implements SessionListener
{
- boolean finish = false;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
+ System.out.println("Message: " + xfr);
}
- public boolean isFinished()
+ public void exception(Session ssn, SessionException exc)
{
- return finish;
+ exc.printStackTrace();
}
+ public void closed(Session ssn) {}
+
/**
* This sends 10 messages to the
* amq.direct exchange using the
* routing key as "routing_key"
*
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
Listener listener = new Listener();
+ session.setSessionListener(listener);
// create a subscription
session.messageSubscribe("message_queue",
"listener_destination",
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX
- session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11);
// confirm completion
session.sync();
// check to see if we have received all the messages
- while (!listener.isFinished()){}
+ System.out.println("Waiting 100 seconds for messages from listener_destination");
+ Thread.sleep(100*1000);
System.out.println("Shutting down listener for listener_destination");
session.messageCancel("listener_destination");
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
index 70c08e0d02..2ed5b2d719 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
@@ -23,63 +23,49 @@ package org.apache.qpid.example.amqpexample.pubsub;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
-public class TopicListener implements MessageListener
+public class TopicListener implements SessionListener
{
- boolean finish = false;
- int count = 0;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey());
-
- if (data != null && data.equals("That's all, folks!"))
- {
- count++;
- if (count == 4){
- finish = true;
- }
- }
+ DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class);
+ System.out.println("Message: " + xfr + " with routing_key " + dp.getRoutingKey());
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public void prepareQueue(Session session,String queueName,String bindingKey)
{
session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
session.exchangeBind(queueName, "amq.topic", bindingKey, null);
session.exchangeBind(queueName, "amq.topic", "control", null);
- session.messageSubscribe(queueName,queueName,
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(this),
- null, Option.NONE);
+ session.messageSubscribe(queueName, queueName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX: need to be able to set to null
- session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24);
}
@@ -88,30 +74,18 @@ public class TopicListener implements MessageListener
session.messageCancel(dest);
}
- public boolean isFinished()
- {
- return finish;
- }
-
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
TopicListener listener = new TopicListener();
+ session.setSessionListener(listener);
listener.prepareQueue(session,"usa", "usa.#");
listener.prepareQueue(session,"europe", "europe.#");
@@ -120,25 +94,19 @@ public class TopicListener implements MessageListener
// confirm completion
session.sync();
- // check to see if we have received all the messages
- while (!listener.isFinished()){}
- System.out.println("Shutting down listener for listener_destination");
+
+ System.out.println("Waiting 100 seconds for messages");
+ Thread.sleep(100*1000);
+
+ System.out.println("Shutting down listeners");
listener.cancelSubscription(session,"usa");
listener.cancelSubscription(session,"europe");
listener.cancelSubscription(session,"news");
listener.cancelSubscription(session,"weather");
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
index 3067309555..20264d3791 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
@@ -21,16 +21,16 @@ package org.apache.qpid.example.amqpexample.pubsub;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
public class TopicPublisher
{
+
public void publishMessages(Session session, String routing_key)
{
// Set the routing key once, we'll use the same routing key for all
@@ -56,16 +56,8 @@ public class TopicPublisher
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -82,15 +74,7 @@ public class TopicPublisher
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}