diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
commit | 394823bba7976c170ac58e53b5d80ad12e0f1690 (patch) | |
tree | 9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java | |
parent | e78747f63bc73daa6e2035453358e6eaf3237b84 (diff) | |
download | qpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
55 files changed, 1336 insertions, 2446 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java index b7982f8e78..aa99112f32 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java @@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.direct; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; /** * This creates a queue a queue and binds it to the @@ -36,16 +35,8 @@ public class DeclareQueue public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -58,15 +49,7 @@ public class DeclareQueue session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java index ba4aec4024..1e571eeede 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -24,63 +24,21 @@ package org.apache.qpid.example.amqpexample.direct; import java.nio.ByteBuffer; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; -public class DirectProducer implements MessageListener +public class DirectProducer { - boolean finish = false; - - public void onMessage(Message m) - { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } - } - - public boolean isFinished() - { - return finish; - } public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -102,16 +60,8 @@ public class DirectProducer implements MessageListener session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java index 7fed35872d..370573c3eb 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.direct; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; - +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; /** * This listens to messages on a queue and terminates * when it sees the final message * */ -public class Listener implements MessageListener +public class Listener implements SessionListener { - boolean finish = false; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } + System.out.println("Message: " + xfr); } - public boolean isFinished() + public void exception(Session ssn, SessionException exc) { - return finish; + exc.printStackTrace(); } + public void closed(Session ssn) {} + /** * This sends 10 messages to the * amq.direct exchange using the * routing key as "routing_key" * */ - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener Listener listener = new Listener(); + session.setSessionListener(listener); // create a subscription session.messageSubscribe("message_queue", "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX - session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); - // check to see if we have received all the messages - while (!listener.isFinished()){} + // wait to receive all the messages + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); System.out.println("Shutting down listener for listener_destination"); session.messageCancel("listener_destination"); //cleanup - session.sessionDetach(session.getName()); - - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java index 75f274bcbb..079be003b1 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java @@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.fanout; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; /** * This creates a queue a queue and binds it to the @@ -36,16 +35,8 @@ public class DeclareQueue public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -58,15 +49,7 @@ public class DeclareQueue session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java index a1ec0b7f29..257bcdbfb1 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java @@ -21,13 +21,12 @@ package org.apache.qpid.example.amqpexample.fanout; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; public class FannoutProducer { @@ -38,16 +37,8 @@ public class FannoutProducer public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -68,16 +59,8 @@ public class FannoutProducer session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java index 432dd7eb4f..dead5569eb 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.fanout; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; - +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; /** * This listens to messages on a queue and terminates * when it sees the final message * */ -public class Listener implements MessageListener +public class Listener implements SessionListener { - boolean finish = false; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } + System.out.println("Message: " + xfr); } - public boolean isFinished() + public void exception(Session ssn, SessionException exc) { - return finish; + exc.printStackTrace(); } + public void closed(Session ssn) {} + /** * This sends 10 messages to the * amq.direct exchange using the * routing key as "routing_key" * */ - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener Listener listener = new Listener(); + session.setSessionListener(listener); // create a subscription session.messageSubscribe("message_queue", "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX - session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); // check to see if we have received all the messages - while (!listener.isFinished()){} + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); System.out.println("Shutting down listener for listener_destination"); session.messageCancel("listener_destination"); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index 70c08e0d02..2ed5b2d719 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -23,63 +23,49 @@ package org.apache.qpid.example.amqpexample.pubsub; import java.nio.ByteBuffer; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; -public class TopicListener implements MessageListener +public class TopicListener implements SessionListener { - boolean finish = false; - int count = 0; - public void onMessage(Message m) + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey()); - - if (data != null && data.equals("That's all, folks!")) - { - count++; - if (count == 4){ - finish = true; - } - } + DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class); + System.out.println("Message: " + xfr + " with routing_key " + dp.getRoutingKey()); } + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + public void prepareQueue(Session session,String queueName,String bindingKey) { session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); session.exchangeBind(queueName, "amq.topic", bindingKey, null); session.exchangeBind(queueName, "amq.topic", "control", null); - session.messageSubscribe(queueName,queueName, - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(this), - null, Option.NONE); + session.messageSubscribe(queueName, queueName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // issue credits // XXX: need to be able to set to null - session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24); } @@ -88,30 +74,18 @@ public class TopicListener implements MessageListener session.messageCancel(dest); } - public boolean isFinished() - { - return finish; - } - - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); // Create an instance of the listener TopicListener listener = new TopicListener(); + session.setSessionListener(listener); listener.prepareQueue(session,"usa", "usa.#"); listener.prepareQueue(session,"europe", "europe.#"); @@ -120,25 +94,19 @@ public class TopicListener implements MessageListener // confirm completion session.sync(); - // check to see if we have received all the messages - while (!listener.isFinished()){} - System.out.println("Shutting down listener for listener_destination"); + + System.out.println("Waiting 100 seconds for messages"); + Thread.sleep(100*1000); + + System.out.println("Shutting down listeners"); listener.cancelSubscription(session,"usa"); listener.cancelSubscription(session,"europe"); listener.cancelSubscription(session,"news"); listener.cancelSubscription(session,"weather"); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java index 3067309555..20264d3791 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java @@ -21,16 +21,16 @@ package org.apache.qpid.example.amqpexample.pubsub; */ -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; public class TopicPublisher { + public void publishMessages(Session session, String routing_key) { // Set the routing key once, we'll use the same routing key for all @@ -56,16 +56,8 @@ public class TopicPublisher public static void main(String[] args) { // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest"); // Create session Session session = con.createSession(0); @@ -82,15 +74,7 @@ public class TopicPublisher session.sync(); //cleanup - session.sessionDetach(session.getName()); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } + session.close(); + con.close(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 27294562e5..ebeb29af78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -917,7 +917,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); - _delegate.closeConneciton(timeout); + _delegate.closeConnection(timeout); //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interupt any running tasks. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 7f36ec6e99..60b827a426 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -42,5 +42,5 @@ public interface AMQConnectionDelegate public void resubscribeSessions() throws JMSException, AMQException, FailoverException; - public void closeConneciton(long timeout) throws JMSException, AMQException; + public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index c723709d27..a7f04a2090 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,7 @@ package org.apache.qpid.client; import java.io.IOException; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -33,15 +34,18 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.TransportException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** * This class logger. @@ -56,7 +60,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed /** * The QpidConeection instance that is mapped with thie JMS connection. */ - org.apache.qpid.nclient.Connection _qpidConnection; + org.apache.qpid.transport.Connection _qpidConnection; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) @@ -125,7 +129,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = Client.createConnection(); + _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) @@ -134,16 +138,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); } + _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword()); - _qpidConnection.setClosedListener(this); _conn._connected = true; } catch(ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } - catch (QpidException e) + catch (ConnectionException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } @@ -161,34 +165,42 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { try { _qpidConnection.close(); } - catch (QpidException e) + catch (TransportException e) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e); + throw new AMQException(e.getMessage(), e); } - } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException exc) { - if (_logger.isDebugEnabled()) + ExceptionListener listener = _conn._exceptionListener; + if (listener == null) { - _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t); + _logger.error("connection exception: " + conn, exc); } - if (_conn._exceptionListener != null) + else { - JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode())); - if (t != null) + ConnectionClose close = exc.getClose(); + String code = null; + if (close != null) { - ex.initCause(t); + code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); + ex.initCause(exc); _conn._exceptionListener.onException(ex); } } + + public void closed(Connection conn) {} + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 2ec8737d16..8d42a2f201 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -58,7 +58,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private AMQConnection _conn; - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { _conn.getProtocolHandler().closeConnection(timeout); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46a667419d..7829966315 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -28,20 +28,27 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.util.Serial; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.qpid.transport.Option.*; + import javax.jms.*; import javax.jms.IllegalStateException; @@ -53,6 +60,7 @@ import java.util.Map; * This is a 0.10 Session */ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> + implements SessionListener { /** @@ -70,10 +78,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * The latest qpid Exception that has been reaised. */ private Object _currentExceptionLock = new Object(); - private QpidException _currentException; + private SessionException _currentException; // a ref on the qpid connection - protected org.apache.qpid.nclient.Connection _qpidConnection; + protected org.apache.qpid.transport.Connection _qpidConnection; private RangeSet unacked = new RangeSet(); private int unackedCount = 0; @@ -97,7 +105,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. * @param qpidConnection The qpid connection */ - AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -108,7 +116,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session - _qpidSession.setClosedListener(new QpidSessionExceptionListener()); + _qpidSession.setSessionListener(this); // set transacted if required if (_transacted) { @@ -127,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. * @param qpidConnection The connection */ - AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -195,12 +203,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (unackedCount > 0) { - getQpidSession().messageAcknowledge + messageAcknowledge (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); clearUnacked(); } } + void messageAcknowledge(RangeSet ranges, boolean accept) + { + Session ssn = getQpidSession(); + for (Range range : ranges) + { + ssn.processed(range); + } + ssn.flushProcessed(accept ? BATCH : NONE); + if (accept) + { + ssn.messageAccept(ranges); + } + } + /** * Bind a queue with an exchange. * @@ -416,11 +438,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), - getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, - preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - (BasicMessageConsumer_0_10) consumer, null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -598,7 +620,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * * @return The associated Qpid Session. */ - protected org.apache.qpid.nclient.Session getQpidSession() + protected Session getQpidSession() { return _qpidSession; } @@ -615,31 +637,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_currentException != null) { - QpidException toBeThrown = _currentException; + SessionException se = _currentException; _currentException = null; - throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()), - toBeThrown.getMessage(), toBeThrown); + ExecutionException ee = se.getException(); + int code; + if (ee == null) + { + code = 0; + } + else + { + code = ee.getErrorCode().getValue(); + } + throw new AMQException + (AMQConstant.getConstant(code), se.getMessage(), se); } } } - //------ Inner classes - /** - * Lstener for qpid protocol exceptions - */ - private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + messageReceived(new UnprocessedMessage_0_10(xfr)); + } + + public void exception(Session ssn, SessionException exc) + { + synchronized (_currentExceptionLock) { - synchronized (_currentExceptionLock) - { - // todo check the error code for finding out if we need to notify the - // JMS connection exception listener - _currentException = new QpidException(reason, errorCode, t); - } + _currentException = exc; } } + public void closed(Session ssn) {} + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean noLocal) throws AMQException @@ -776,7 +808,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) { // send completed so consumer credits don't dry up - getQpidSession().messageAcknowledge(_txRangeSet, false); + messageAcknowledge(_txRangeSet, false); } } @@ -787,7 +819,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if( _txSize > 0 ) { - getQpidSession().messageAcknowledge(_txRangeSet, true); + messageAcknowledge(_txRangeSet, true); _txRangeSet.clear(); _txSize = 0; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 2a37298a43..7d535643c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * This is a 0.10 message consumer. */ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> - implements org.apache.qpid.nclient.MessagePartListener { /** @@ -114,9 +113,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _consumerTagString; } - - // ----- Interface org.apache.qpid.client.util.MessageListener - /** * * This is invoked by the session thread when emptying the session message queue. @@ -159,28 +155,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - - - /** - * This method is invoked by the transport layer when a message is delivered for this - * consumer. The message is transformed and pass to the session. - * @param xfr an 0.10 message transfer - */ - public void messageTransfer(MessageTransfer xfr) - - //public void onMessage(Message message) - { - int channelId = getSession().getChannelId(); - int consumerTag = getConsumerTag(); - - UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(consumerTag, xfr); - - - getSession().messageReceived(newMessage); - // else ignore this message - } - //----- overwritten methods /** @@ -304,8 +278,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageAcknowledge(ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); _0_10session.getCurrentException(); } } @@ -425,10 +400,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM void postDeliver(AbstractJMSMessage msg) throws JMSException { super.postDeliver(msg); - if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index ef8aeb58a1..4e5077f0cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -36,7 +36,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.util.Strings; -import org.apache.qpid.nclient.util.ByteBufferMessage; import org.apache.qpid.njms.ExceptionHelper; import org.apache.qpid.transport.*; import static org.apache.qpid.transport.Option.*; diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 6763c72ecd..35adda9348 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -88,8 +88,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -142,8 +141,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -171,8 +169,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } } @@ -201,8 +198,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } } return result; @@ -248,8 +244,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } DtxXaStatus status = result.getStatus(); int outcome = XAResource.XA_OK; @@ -291,8 +286,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr( e.getException().getErrorCode()); } Xid[] result = new Xid[res.getInDoubt().size()]; int i = 0; @@ -329,8 +323,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr( e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -413,8 +406,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); // TODO: The amqp spec does not allow to make the difference // between an already known XID and a wrong arguments (join and resume are set) // TODO: make sure amqp addresses that diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 51b4b7899f..354b67cd35 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -17,7 +17,6 @@ */ package org.apache.qpid.client; -import org.apache.qpid.nclient.DtxSession; import org.apache.qpid.client.message.MessageFactoryRegistry; import javax.jms.*; @@ -36,7 +35,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * This XASession Qpid DtxSession */ - private DtxSession _qpidDtxSession; + private org.apache.qpid.transport.Session _qpidDtxSession; /** * The standard session @@ -48,7 +47,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * Create a JMS XASession */ - public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, int defaultPrefetchHigh, int defaultPrefetchLow) { super(qpidConnection, con, channelId, false, // this is not a transacted session @@ -65,7 +64,9 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = _qpidConnection.createDTXSession(0); + _qpidDtxSession = _qpidConnection.createSession(0); + _qpidDtxSession.setSessionListener(this); + _qpidDtxSession.dtxSelect(); } @@ -126,7 +127,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic * * @return The associated Qpid Session. */ - protected org.apache.qpid.nclient.DtxSession getQpidSession() + protected org.apache.qpid.transport.Session getQpidSession() { return _qpidDtxSession; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 8b4488f1f9..d064c27754 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.nclient.*; import org.apache.qpid.jms.Message; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; @@ -138,7 +137,7 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate } - public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session) + public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session) { DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); if(deliveryProps != null) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 6b1301a33f..f31bc88509 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -33,9 +33,9 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage { private MessageTransfer _transfer; - public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr) + public UnprocessedMessage_0_10(MessageTransfer xfr) { - super(consumerTag); + super(Integer.parseInt(xfr.getDestination())); _transfer = xfr; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java deleted file mode 100644 index af0e724e42..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.nclient; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpid.client.url.URLParser_0_10; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.url.QpidURL; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.impl.ClientSession; -import org.apache.qpid.nclient.impl.ClientSessionDelegate; -import org.apache.qpid.transport.Channel; -import org.apache.qpid.transport.ClientDelegate; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionCloseOk; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.ProtocolVersionException; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.network.io.IoTransport; -import org.apache.qpid.transport.network.mina.MinaHandler; -import org.apache.qpid.transport.network.nio.NioHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class Client implements org.apache.qpid.nclient.Connection -{ - private Connection _conn; - private ClosedListener _closedListner; - private final Lock _lock = new ReentrantLock(); - private static Logger _logger = LoggerFactory.getLogger(Client.class); - private Condition closeOk; - private boolean closed = false; - private long timeout = 60000; - - private ProtocolHeader header = null; - - /** - * - * @return returns a new connection to the broker. - */ - public static org.apache.qpid.nclient.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - - final Condition negotiationComplete = _lock.newCondition(); - closeOk = _lock.newCondition(); - _lock.lock(); - - ClientDelegate connectionDelegate = new ClientDelegate() - { - private boolean receivedClose = false; - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - public void exception(Throwable t) - { - if (_closedListner != null) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); - } - else - { - throw new RuntimeException("connection closed",t); - } - } - - public void closed() - { - if (_closedListner != null && !this.receivedClose) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); - } - } - - @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) - { - _lock.lock(); - try - { - closed = true; - this.receivedClose = true; - closeOk.signalAll(); - } - finally - { - _lock.unlock(); - } - } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) - { - super.connectionClose(context, connectionClose); - ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); - if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) - { - throw new RuntimeException - (new QpidException("Server closed the connection: Reason " + - connectionClose.getReplyText(), - errorCode, - null)); - } - else - { - _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); - } - - this.receivedClose = true; - } - @Override public void init(Channel ch, ProtocolHeader hdr) - { - // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and mino - if (hdr.getMajor() != 0 || hdr.getMinor() != 10) - { - Client.this.header = hdr; - _lock.lock(); - negotiationComplete.signalAll(); - _lock.unlock(); - } - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - String transport = System.getProperty("transport","io"); - if (transport.equalsIgnoreCase("nio")) - { - _logger.info("using NIO Transport"); - _conn = NioHandler.connect(host, port,connectionDelegate); - } - else if (transport.equalsIgnoreCase("io")) - { - _logger.info("using Plain IO Transport"); - _conn = IoTransport.connect(host, port,connectionDelegate); - } - else - { - _logger.info("using MINA Transport"); - _conn = MinaHandler.connect(host, port,connectionDelegate); - // _conn = NativeHandler.connect(host, port,connectionDelegate); - } - - // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, 0, 10)); - - try - { - negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if (header != null) - { - _conn.close(); - throw new ProtocolVersionException(header.getMajor(), header.getMinor()); - } - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - finally - { - _lock.unlock(); - } - } - - public void connect(String url)throws QpidException - { - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(url); - } - catch(Exception e) - { - throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); - } - List<BrokerDetails> brokers = parser.getAllBrokerDetails(); - BrokerDetails brokerDetail = brokers.get(0); - connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), - brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), - brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(QpidURL url) throws QpidException - { - throw new UnsupportedOperationException("Not implemented"); - } - - /* { - // temp impl to tests - BrokerDetails details = url.getAllBrokerDetails().get(0); - connect(details.getHost(), - details.getPort(), - details.getVirtualHost(), - details.getUserName(), - details.getPassword()); - } -*/ - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); - _lock.lock(); - try - { - try - { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed && elapsed < timeout) - { - closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); - elapsed = System.currentTimeMillis() - start; - } - if(!closed) - { - throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - catch (InterruptedException e) - { - throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - finally - { - _lock.unlock(); - } - _conn.close(); - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(); - ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); - clientSession.dtxSelect(); - return (DtxSession) clientSession; - } - - public void setClosedListener(ClosedListener closedListner) - { - - _closedListner = closedListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java deleted file mode 100644 index 2d5b50b33a..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection using the given parameters - * - * @param host host name - * @param port port number - * @param virtualHost the virtual host name - * @param username user name - * @param password password - * @throws QpidException If the communication layer fails to establish the connection. - */ - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; - - /** - * Establish the connection with the broker identified by the URL. - * - * @param url Specifies the URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown. - */ - public void connect(String url) throws QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() throws QpidException; - - /** - * Create a session for this connection. - * <p> The returned session is suspended - * (i.e. this session is not attached to an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than - * or equal to 0 then the session does not expire. - * @return A newly created (suspended) session. - */ - public Session createSession(long expiryInSeconds); - - /** - * Create a DtxSession for this connection. - * <p> A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - * <p> The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal - * to 0 then the session does not expire. - * @return A newly created (suspended) DtxSession. - */ - public DtxSession createDTXSession(int expiryInSeconds); - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ClosedListener - * - * @param exceptionListner The ClosedListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java deleted file mode 100644 index 8a859f2d84..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.transport.Future; -import org.apache.qpid.transport.GetTimeoutResult; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.RecoverResult; -import org.apache.qpid.transport.XaResult; -import org.apache.qpid.transport.Xid; - -/** - * The resources for this session are controlled under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * This method is called when messages should be produced and consumed on behalf a transaction - * branch identified by xid. - * possible options are: - * <ul> - * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. - * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be started. - * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @return Confirms to the client that the transaction branch is started or specify the error condition. - */ - public Future<XaResult> dtxStart(Xid xid, Option... options); - - /** - * This method is called when the work done on behalf of a transaction branch finishes or needs to - * be suspended. - * possible options are: - * <ul> - * <li> {@link Option#FAIL}: indicates that this portion of work has failed; - * otherwise this portion of work has - * completed successfully. - * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is - * temporarily suspended in an incomplete state. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be ended. - * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @return Confirms to the client that the transaction branch is ended or specifies the error condition. - */ - public Future<XaResult> dtxEnd(Xid xid, Option... options); - - /** - * Commit the work done on behalf of a transaction branch. This method commits the work associated - * with xid. Any produced messages are made available and any consumed messages are discarded. - * The only possible option is: - * <ul> - * <li> {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be committed. - * @param options Available option is: {@link Option#ONE_PHASE} - * @return Confirms to the client that the transaction branch is committed or specifies the error condition. - */ - public Future<XaResult> dtxCommit(Xid xid, Option... options); - - /** - * This method is called to forget about a heuristically completed transaction branch. - * - * @param xid Specifies the xid of the transaction branch to be forgotten. - */ - public void dtxForget(Xid xid, Option ... options); - - /** - * This method obtains the current transaction timeout value in seconds. If set-timeout was not - * used prior to invoking this method, the return value is the default timeout value; otherwise, the - * value used in the previous set-timeout call is returned. - * - * @param xid Specifies the xid of the transaction branch used for getting the timeout. - * @return The current transaction timeout value in seconds. - */ - public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options); - - /** - * This method prepares any message produced or consumed on behalf of xid, ready for commitment. - * - * @param xid Specifies the xid of the transaction branch to be prepared. - * @return The status of the prepare operation can be any one of: - * xa-ok: Normal execution. - * <p/> - * xa-rdonly: The transaction branch was read-only and has been committed. - * <p/> - * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified - * reason. - * <p/> - * xa-rbtimeout: The work represented by this transaction branch took too long. - */ - public Future<XaResult> dtxPrepare(Xid xid, Option ... options); - - /** - * This method is called to obtain a list of transaction branches that are in a prepared or - * heuristically completed state. - * @return a array of xids to be recovered. - */ - public Future<RecoverResult> dtxRecover(Option ... options); - - /** - * This method rolls back the work associated with xid. Any produced messages are discarded and - * any consumed messages are re-queued. - * - * @param xid Specifies the xid of the transaction branch to be rolled back. - * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. - */ - public Future<XaResult> dtxRollback(Xid xid, Option ... options); - - /** - * Sets the specified transaction branch timeout value in seconds. - * - * @param xid Specifies the xid of the transaction branch for setting the timeout. - * @param timeout The transaction timeout value in seconds. - */ - public void dtxSetTimeout(Xid xid, long timeout, Option ... options); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java deleted file mode 100644 index 0d84394c7c..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.qpid.transport.*; -import org.apache.qpid.api.Message; - -/** - * <p>A session is associated with a connection. - * When it is created, a session is not associated with an underlying channel. - * The session is single threaded. </p> - * <p/> - * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. - * For example, <code>command1</code> will be synchronously invoked by using the following sequence: - * <ul> - * <li> <code>session.command1()</code> - * <li> <code>session.sync()</code> - * </ul> - */ -public interface Session -{ - public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1; - public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0; - public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0; - public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1; - public static final short MESSAGE_FLOW_MODE_CREDIT = 0; - public static final short MESSAGE_FLOW_MODE_WINDOW = 1; - public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; - public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; - public static final short MESSAGE_REJECT_CODE_GENERIC = 0; - public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; - public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; - public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - - /** - * Sync method will block the session until all outstanding commands - * are executed. - */ - public void sync(); - - public void close(); - - public void sessionDetach(byte[] name, Option ... options); - - public void sessionRequestTimeout(long expiry, Option ... options); - - public byte[] getName(); - - public void setAutoSync(boolean value); - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - /** - * Transfer a message to a specified exchange. - * <p/> - * <p>This transfer provides a complete message - * using a single method. The method is internally mapped to messageTransfer() and headers() followed - * by data() and endData(). - * <b><i>This method should only be used by small messages.</b></i></p> - * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - * <p/> - * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - * </ul> - * @param acquireMode <ul> - * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - * </ul> - * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) - throws IOException; - - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, ByteBuffer body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, byte[] body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, String body, Option ... options); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - * <p> The destination is bound to a queue, and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). - * <p> The valid options are: - * <ul> - * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this - * subscription can access the queue. - * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect. - * </ul> - * - * @param queue The queue that the receiver is receiving messages from. - * @param destination The destination, or delivery tag, for the subscriber. - * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - * <p/> - * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - * </ul> - * @param acquireMode <ul> - * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must - * be explicitly acquired. - * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - * </ul> - * @param listener The listener for this destination. To transfer large messages - * use a {@link org.apache.qpid.nclient.MessagePartListener}. - * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NONE}. - * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies - * according to the provider's implementation. - */ - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map<String, Object> filter, Option... options); - - /** - * This method cancels a consumer. The server will not send any more messages to the specified destination. - * This does not affect already delivered messages. - * The client may receive a - * number of messages in between sending the cancel method and receiving - * notification that the cancellation has been completed. - * - * @param destination The destination to be cancelled. - */ - public void messageCancel(String destination, Option ... options); - - /** - * Associate a message listener with a destination. - * <p> Only one listener is permitted for each destination. When a new listener is created, - * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener - * has completed processing a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - * <p> With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - * <p> Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control - * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul> - */ - public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); - - - /** - * This method controls the flow of message data to a given destination. It is used by the - * recipient of messages to dynamically match the incoming rate of message flow to its - * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" - * number of the specified unit to the available credit balance for the specified destination. - * A value of 0 indicates an infinite amount of credit. This disables any limit for - * the given unit until the credit balance is zeroed with {@link Session#messageStop} - * or {@link Session#messageFlush}. - * - * @param destination The destination to set the flow. - * @param unit Specifies the unit of credit balance. - * <p/> - * One of: <ul> - * <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE}) - * <li> byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE}) - * </ul> - * @param value Number of credits, a value of 0 indicates an infinite amount of credit. - */ - public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); - - /** - * Forces the broker to exhaust its credit supply. - * <p> The credit on the broker will remain at zero once - * this method is completed. - * - * @param destination The destination on which the credit supply is to be exhausted. - */ - public void messageFlush(String destination, Option ... options); - - /** - * On receipt of this method, the brokers set credit to zero for a given - * destination. When confirmation of this method - * is issued credit is set to zero. No further messages will be sent until - * further credit is received. - * - * @param destination The destination on which to reset credit. - */ - public void messageStop(String destination, Option ... options); - - /** - * Acknowledge the receipt of a range of messages. - * <p>Messages must already be acquired, either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of messages to be acknowledged. - * @param accept pecify whether to send a message accept to the broker - */ - public void messageAcknowledge(RangeSet ranges, boolean accept); - - /** - * Reject a range of acquired messages. - * <p>The broker will deliver rejected messages to the - * alternate-exchange on the queue from which it came. If no alternate-exchange is - * defined for that queue the broker will discard the message. - * - * @param ranges Range of messages to be rejected. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - * <p> A recommended invocation sequence would be: - * <ul> - * <li> {@link Session#messageReject} - * <li> {@link Session#sync()} - * <li> {@link Session#getRejectedMessages()} - * </ul> - * - * @return The rejected message ranges - */ - public RangeSet getRejectedMessages(); - - /** - * Try to acquire ranges of messages hence releasing them form the queue. - * This means that once acknowledged, a message will not be delivered to any other receiver. - * <p> As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - * <p> This method should only be called on non-acquired messages. - * - * @param ranges Ranges of messages to be acquired. - * @return Indicates the acquired messages - */ - public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options); - - /** - * Give up responsibility for processing ranges of messages. - * <p> Released messages are re-enqueued. - * - * @param ranges Ranges of messages to be released. - * @param options Valid option is: {@link Option#SET_REDELIVERED}) - */ - public void messageRelease(RangeSet ranges, Option ... options); - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - */ - public void txSelect(Option ... options); - - /** - * Commit the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txCommit(Option ... options) throws IllegalStateException; - - /** - * Roll back the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txRollback(Option ... options) throws IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * If this field is set and the exclusive field is not set the queue is deleted when all - * the consumers have finished using it. - * <li> {@link Option#DURABLE}: <p> If set when creating a new queue, - * the queue will be marked as durable. Durable queues - * remain active when a server restarts. Non-durable queues (transient queues) are purged - * if/when a server restarts. Note that durable queues do not necessarily hold persistent - * messages, although it does not make sense to send persistent messages to a transient - * queue. - * <li> {@link Option#EXCLUSIVE}: <p> Exclusive queues can only be used from one connection at a time. - * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the - * declaring connection closes. - * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue. - * This field allows the client to assert the presence of a queue without modifying the server state. - * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * <p>In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - * <oL> <li> The queue is deleted when it is not empty; - * <li> Immediate delivery of a message is requested, but there are no consumers connected to - * the queue. </ol> - * @param arguments Used for backward compatibility - * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, - Option... options); - - /** - * Bind a queue with an exchange. - * - * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current - * queue for the session, which is the last declared queue. - * @param exchangeName The exchange name. - * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages - * depending on the exchange configuration. Not all exchanges use a routing key - refer to - * the specific exchange documentation. If the queue name is empty, the server uses the last - * queue declared on the session. If the routing key is also empty, the server uses this - * queue name for the routing key as well. If the queue name is provided but the routing key - * is empty, the server does the binding with that empty routing key. The meaning of empty - * routing keys depends on the exchange implementation. - * @param arguments Used for backward compatibility - */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments, - Option ... options); - - /** - * Unbind a queue from an exchange. - * - * @param queueName Specifies the name of the queue to unbind. - * @param exchangeName The name of the exchange to unbind from. - * @param routingKey Specifies the routing key of the binding to unbind. - */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); - - /** - * This method removes all messages from a queue. It does not cancel consumers. Purged messages - * are deleted without any formal "undo" mechanism. - * - * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - */ - public void queuePurge(String queueName, Option ... options); - - /** - * This method deletes a queue. When a queue is deleted any pending messages are sent to a - * dead-letter queue if this is defined in the server configuration, and all consumers on the - * queue are cancelled. - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages. - * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers. - * If the queue has consumers the server does does not delete it but raises a channel exception instead. - * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * </p> - * <p/> - * <p>In the absence of a particular option, the defaul value is false for each option</p> - * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NONE}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - - /** - * This method is used to request information on a particular queue. - * - * @param queueName The name of the queue for which information is requested. - * @return Information on the specified queue. - */ - public Future<QueueQueryResult> queueQuery(String queueName, Option ... options); - - - /** - * This method is used to request information on a particular binding. - * - * @param exchange The exchange name. - * @param queue The queue name. - * @param routingKey The routing key - * @param arguments bacward compatibilties params. - * @return Information on the specified binding. - */ - public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey, - Map<String, Object> arguments, Option ... options); - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * This method creates an exchange. If the exchange already exists, - * the method verifies the class and checks the details are correct. - * <p>Valid options are: - * <ul> - * <li>{@link Option#AUTO_DELETE}: <p>If set, the exchange is deleted when all queues have finished using it. - * <li>{@link Option#DURABLE}: <p>If set, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged when a server restarts. - * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect. - * </ul> - * <p>In the absence of a particular option, the defaul value is false for each option</p> - * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NONE}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, - Map<String, Object> arguments, Option... options); - - /** - * This method deletes an exchange. When an exchange is deleted all queue bindings on the - * exchange are cancelled. - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * <p>Note that if an option is not set, it will default to false. - * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options); - - - /** - * This method is used to request information about a particular exchange. - * - * @param exchangeName The name of the exchange about which information is requested. If not set, the method will - * return information about the default exchange. - * @return Information on the specified exchange. - */ - public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options); - - /** - * If the session receives a sessionClosed with an error code it - * informs the session's exceptionListener - * - * @param exceptionListner The exceptionListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java deleted file mode 100644 index 869f974ae6..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ /dev/null @@ -1,163 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.QpidException; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.MessagePartListener; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; - -import static org.apache.qpid.transport.Option.*; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession -{ - static - { - String max = "message_size_before_sync"; // KB's - try - { - MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_SYNC_DATA_LENGH = 200000000; - } - String flush = "message_size_before_flush"; - try - { - MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_FLUSH_DATA_LENGH = 20000000; - } - } - - private static long MAX_NOT_SYNC_DATA_LENGH; - private static long MAX_NOT_FLUSH_DATA_LENGH; - - private Map<String,MessagePartListener> _messageListeners = new ConcurrentHashMap<String,MessagePartListener>(); - private ClosedListener _exceptionListner; - private RangeSet _rejectedMessages; - private long _currentDataSizeNotSynced; - private long _currentDataSizeNotFlushed; - - public ClientSession(byte[] name) - { - super(name); - } - - public void messageAcknowledge(RangeSet ranges, boolean accept) - { - for (Range range : ranges) - { - super.processed(range); - } - super.flushProcessed(accept ? BATCH : NONE); - if (accept) - { - messageAccept(ranges); - } - } - - public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), null, 0, filter, - options); - } - - public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - DeliveryProperties dp = msg.getDeliveryProperties(); - MessageProperties mp = msg.getMessageProperties(); - ByteBuffer body = msg.readData(); - int size = body.remaining(); - super.messageTransfer - (destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), - new Header(dp, mp), body); - _currentDataSizeNotSynced += size; - _currentDataSizeNotFlushed += size; - } - - public void sync() - { - super.sync(); - _currentDataSizeNotSynced = 0; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - if (listener == null) - { - throw new IllegalArgumentException("Cannot set message listener to null"); - } - _messageListeners.put(destination, listener); - } - - public void setClosedListener(ClosedListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onClosed(null, null, null); - } - - Map<String,MessagePartListener> getMessageListeners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java deleted file mode 100644 index 6bcd4fbce5..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; - -import org.apache.qpid.ErrorCode; - -import org.apache.qpid.nclient.MessagePartListener; - -import org.apache.qpid.QpidException; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageReject; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDetached; -import org.apache.qpid.transport.SessionDelegate; - - -public class ClientSessionDelegate extends SessionDelegate -{ - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - @Override public void messageTransfer(Session session, MessageTransfer xfr) - { - MessagePartListener listener = ((ClientSession)session).getMessageListeners() - .get(xfr.getDestination()); - listener.messageTransfer(xfr); - } - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); - session.processed(struct); - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java index 155de2a678..a1dc48fcda 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java @@ -23,60 +23,55 @@ package org.apache.qpid.nclient.impl; import org.apache.qpid.ErrorCode; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import java.nio.ByteBuffer; import java.util.UUID; public class DemoClient { - public static MessagePartListenerAdapter createAdapter() + public static class DemoListener implements SessionListener { - return new MessagePartListenerAdapter(new MessageListener() + public void opened(Session ssn) {} + + public void exception(Session ssn, SessionException exc) + { + System.out.println(exc); + } + + public void message(Session ssn, MessageTransfer m) { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getHeader().get(MessageProperties.class).getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + public void closed(Session ssn) {} } public static final void main(String[] args) { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } + Connection conn = new Connection(); + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); + ssn.setSessionListener(new DemoListener()); ssn.queueDeclare("queue1", null, null); ssn.exchangeBind("queue1", "amq.direct", "queue1",null); ssn.sync(); - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // queue ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, @@ -91,9 +86,12 @@ public class DemoClient ssn.sync(); // topic subs - ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic1", "myDest2", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic2", "myDest3", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic3", "myDest4", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); ssn.sync(); ssn.queueDeclare("topic1", null, null); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java index dd4a78fa2b..6c6cc308e9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java @@ -28,12 +28,6 @@ import java.util.Map; import org.apache.qpid.ErrorCode; import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -41,9 +35,13 @@ import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; -public class BasicInteropTest implements ClosedListener +public class BasicInteropTest implements SessionListener { private Session session; @@ -62,7 +60,7 @@ public class BasicInteropTest implements ClosedListener public void testCreateConnection(){ System.out.println("------- Creating connection--------"); - conn = Client.createConnection(); + conn = new Connection(); try{ conn.connect(host, 5672, "test", "guest", "guest"); }catch(Exception e){ @@ -116,23 +114,11 @@ public class BasicInteropTest implements ClosedListener public void testSubscribe() { System.out.println("------- Sending a subscribe --------"); + session.setSessionListener(this); session.messageSubscribe("testQueue", "myDest", - Session.TRANSFER_CONFIRM_MODE_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(new MessageListener(){ - - public void onMessage(Message message) - { - System.out.println("--------Message Received--------"); - System.out.println(message.toString()); - System.out.println("--------/Message Received--------"); - RangeSet ack = new RangeSet(); - ack.add(message.getMessageTransferId(),message.getMessageTransferId()); - session.messageAcknowledge(ack, true); - } - - }), - null); + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); System.out.println("------- Setting Credit mode --------"); session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); @@ -141,20 +127,32 @@ public class BasicInteropTest implements ClosedListener session.messageFlow("myDest", MessageCreditUnit.BYTE, -1); } + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("--------Message Received--------"); + System.out.println(xfr.toString()); + System.out.println("--------/Message Received--------"); + ssn.processed(xfr); + ssn.flushProcessed(); + } + public void testMessageFlush() { session.messageFlush("myDest"); session.sync(); } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void exception(Session ssn, SessionException exc) { System.out.println("------- Broker Notified an error --------"); - System.out.println("------- " + errorCode + " --------"); - System.out.println("------- " + reason + " --------"); + System.out.println("------- " + exc + " --------"); System.out.println("------- /Broker Notified an error --------"); } + public void closed(Session ssn) {} + public static void main(String[] args) throws QpidException { String host = "0.0.0.0"; diff --git a/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java index 83d434b20a..db84b83adb 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -174,23 +174,14 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { final ToyExchange exchange = new ToyExchange(); - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new ToyBroker(exchange); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - MinaHandler.accept("0.0.0.0", 5672, delegate); } diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java index cb10859c9f..79bb286d76 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -56,7 +56,7 @@ class ToyClient extends SessionDelegate public static final void main(String[] args) { Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ClientDelegate() + new ClientDelegate(null, "guest", "guest") { public SessionDelegate getSessionDelegate() { diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java index d6b015930b..d973739ed6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java @@ -112,7 +112,7 @@ public class Channel extends Invoker public void closed() { - log.debug("channel closed: ", this); + log.debug("channel closed: %s", this); if (session != null) { session.closed(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index bbdadfadb9..316c26429e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,21 +20,123 @@ */ package org.apache.qpid.transport; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import java.io.UnsupportedEncodingException; + +import org.apache.qpid.QpidException; + +import org.apache.qpid.security.UsernamePasswordCallbackHandler; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + + +import static org.apache.qpid.transport.Connection.State.*; + /** * ClientDelegate * */ -public abstract class ClientDelegate extends ConnectionDelegate +public class ClientDelegate extends ConnectionDelegate { + private String vhost; + private String username; + private String password; + + public ClientDelegate(String vhost, String username, String password) + { + this.vhost = vhost; + this.username = username; + this.password = password; + } + public void init(Channel ch, ProtocolHeader hdr) { if (hdr.getMajor() != 0 && hdr.getMinor() != 10) { - throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()); + Connection conn = ch.getConnection(); + conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor())); + } + } + + @Override public void connectionStart(Channel ch, ConnectionStart start) + { + Connection conn = ch.getConnection(); + List<Object> mechanisms = start.getMechanisms(); + if (mechanisms == null || mechanisms.isEmpty()) + { + ch.connectionStartOk + (Collections.EMPTY_MAP, null, null, conn.getLocale()); + return; } + + String[] mechs = new String[mechanisms.size()]; + mechanisms.toArray(mechs); + + try + { + UsernamePasswordCallbackHandler handler = + new UsernamePasswordCallbackHandler(); + handler.initialise(username, password); + SaslClient sc = Sasl.createSaslClient + (new String[] {"PLAIN"}, null, "AMQP", "localhost", null, handler); + conn.setSaslClient(sc); + + byte[] response = sc.hasInitialResponse() ? + sc.evaluateChallenge(new byte[0]) : null; + ch.connectionStartOk + (Collections.EMPTY_MAP, sc.getMechanismName(), response, + conn.getLocale()); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionSecure(Channel ch, ConnectionSecure secure) + { + Connection conn = ch.getConnection(); + SaslClient sc = conn.getSaslClient(); + try + { + byte[] response = sc.evaluateChallenge(secure.getChallenge()); + ch.connectionSecureOk(response); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionTune(Channel ch, ConnectionTune tune) + { + Connection conn = ch.getConnection(); + conn.setChannelMax(tune.getChannelMax()); + ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + ch.connectionOpen(vhost, null, Option.INSIST); + } + + @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok) + { + ch.getConnection().setState(OPEN); + } + + @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir) + { + throw new UnsupportedOperationException(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 68b9b209bb..ae9420eb1a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,14 +20,22 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; +import java.util.UUID; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; + +import static org.apache.qpid.transport.Connection.State.*; /** @@ -44,23 +52,164 @@ public class Connection implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { + enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + private static final Logger log = Logger.get(Connection.class); - final private Sender<ProtocolEvent> sender; - final private ConnectionDelegate delegate; + class DefaultConnectionListener implements ConnectionListener + { + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException exception) + { + throw exception; + } + public void closed(Connection conn) {} + } + + private ConnectionDelegate delegate; + private Sender<ProtocolEvent> sender; + + final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + + private State state = NEW; + private Object lock = new Object(); + private long timeout = 60000; + private ConnectionListener listener = new DefaultConnectionListener(); + private Throwable error = null; + private int channelMax = 1; + private String locale; + private SaslServer saslServer; + private SaslClient saslClient; + // want to make this final private int _connectionId; - final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + public Connection() {} - public Connection(Sender<ProtocolEvent> sender, - ConnectionDelegate delegate) + public void setConnectionDelegate(ConnectionDelegate delegate) { - this.sender = sender; this.delegate = delegate; } + public void setConnectionListener(ConnectionListener listener) + { + if (listener == null) + { + this.listener = new DefaultConnectionListener(); + } + else + { + this.listener = listener; + } + } + + public Sender<ProtocolEvent> getSender() + { + return sender; + } + + public void setSender(Sender<ProtocolEvent> sender) + { + this.sender = sender; + } + + void setState(State state) + { + synchronized (lock) + { + this.state = state; + lock.notifyAll(); + } + } + + void setLocale(String locale) + { + this.locale = locale; + } + + String getLocale() + { + return locale; + } + + void setSaslServer(SaslServer saslServer) + { + this.saslServer = saslServer; + } + + SaslServer getSaslServer() + { + return saslServer; + } + + void setSaslClient(SaslClient saslClient) + { + this.saslClient = saslClient; + } + + SaslClient getSaslClient() + { + return saslClient; + } + + public void connect(String host, int port, String vhost, String username, String password) + { + synchronized (lock) + { + state = OPENING; + + delegate = new ClientDelegate(vhost, username, password); + + IoTransport.connect(host, port, ConnectionBinding.get(this)); + send(new ProtocolHeader(1, 0, 10)); + + Waiter w = new Waiter(lock, timeout); + while (w.hasTime() && state == OPENING && error == null) + { + w.await(); + } + + if (error != null) + { + Throwable t = error; + error = null; + close(); + throw new ConnectionException(t); + } + + switch (state) + { + case OPENING: + close(); + throw new ConnectionException("connect() timed out"); + case OPEN: + break; + case CLOSED: + throw new ConnectionException("connect() aborted"); + default: + throw new IllegalStateException(String.valueOf(state)); + } + } + + listener.opened(this); + } + + public Session createSession() + { + return createSession(0); + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = getChannel(); + Session ssn = new Session(UUID.randomUUID().toString().getBytes()); + ssn.attach(ch); + ssn.sessionAttach(ssn.getName()); + ssn.sessionRequestTimeout(expiryInSeconds); + return ssn; + } + public void setConnectionId(int id) { _connectionId = id; @@ -86,7 +235,12 @@ public class Connection public void send(ProtocolEvent event) { log.debug("SEND: [%s] %s", this, event); - sender.send(event); + Sender s = sender; + if (s == null) + { + throw new ConnectionException("connection closed"); + } + s.send(event); } public void flush() @@ -107,7 +261,7 @@ public class Connection public Channel getChannel() { - synchronized (channels) + synchronized (lock) { for (int i = 0; i < getChannelMax(); i++) { @@ -123,7 +277,7 @@ public class Connection public Channel getChannel(int number) { - synchronized (channels) + synchronized (lock) { Channel channel = channels.get(number); if (channel == null) @@ -137,45 +291,146 @@ public class Connection void removeChannel(int number) { - synchronized (channels) + synchronized (lock) { channels.remove(number); } } + public void exception(ConnectionException e) + { + synchronized (lock) + { + switch (state) + { + case OPENING: + case CLOSING: + error = e; + lock.notifyAll(); + break; + default: + listener.exception(this, e); + break; + } + } + } + public void exception(Throwable t) { - delegate.exception(t); + synchronized (lock) + { + switch (state) + { + case OPENING: + case CLOSING: + error = t; + lock.notifyAll(); + break; + default: + listener.exception(this, new ConnectionException(t)); + break; + } + } } void closeCode(ConnectionClose close) { - synchronized (channels) + synchronized (lock) { for (Channel ch : channels.values()) { ch.closeCode(close); } + ConnectionCloseCode code = close.getReplyCode(); + if (code != ConnectionCloseCode.NORMAL) + { + exception(new ConnectionException(close)); + } } } public void closed() { log.debug("connection closed: %s", this); - synchronized (channels) + + if (state == OPEN) + { + exception(new ConnectionException("connection aborted")); + } + + synchronized (lock) { List<Channel> values = new ArrayList<Channel>(channels.values()); for (Channel ch : values) { ch.closed(); } + + sender = null; + setState(CLOSED); } - delegate.closed(); + + listener.closed(this); } public void close() { - sender.close(); + synchronized (lock) + { + switch (state) + { + case OPEN: + Channel ch = getChannel(0); + state = CLOSING; + ch.connectionClose(ConnectionCloseCode.NORMAL, null); + Waiter w = new Waiter(lock, timeout); + while (w.hasTime() && state == CLOSING && error == null) + { + w.await(); + } + + if (error != null) + { + close(); + throw new ConnectionException(error); + } + + switch (state) + { + case CLOSING: + close(); + throw new ConnectionException("close() timed out"); + case CLOSED: + break; + default: + throw new IllegalStateException(String.valueOf(state)); + } + break; + case CLOSED: + break; + default: + if (sender != null) + { + sender.close(); + w = new Waiter(lock, timeout); + while (w.hasTime() && sender != null && error == null) + { + w.await(); + } + + if (error != null) + { + throw new ConnectionException(error); + } + + if (sender != null) + { + throw new ConnectionException("close() timed out"); + } + } + break; + } + } } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 2aa1db7b28..9056a3120b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -22,22 +22,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.SecurityHelper; -import org.apache.qpid.QpidException; - -import java.io.UnsupportedEncodingException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import static org.apache.qpid.transport.Connection.State.*; /** @@ -57,231 +42,26 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> private static final Logger log = Logger.get(ConnectionDelegate.class); - private String _username = "guest"; - private String _password = "guest";; - private String _mechanism; - private String _virtualHost; - private SaslClient saslClient; - private SaslServer saslServer; - private String _locale = "utf8"; - private int maxFrame = 64*1024; - private Condition _negotiationComplete; - private Lock _negotiationCompleteLock; - - public abstract SessionDelegate getSessionDelegate(); - - public abstract void exception(Throwable t); - - public abstract void closed(); - - public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) - { - _negotiationComplete = negotiationComplete; - _negotiationCompleteLock = negotiationCompleteLock; - } - - public void init(Channel ch, ProtocolHeader hdr) - { - ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor())); - List<Object> plain = new ArrayList<Object>(); - plain.add("PLAIN"); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - ch.connectionStart(null, plain, utf8); - } - - // ---------------------------------------------- - // Client side - //----------------------------------------------- - @Override public void connectionStart(Channel context, ConnectionStart struct) - { - String mechanism = null; - byte[] response = null; - try - { - mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); - saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, - SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = saslClient.evaluateChallenge(new byte[0]); - } - catch (UnsupportedEncodingException e) - { - // need error handling - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - - Map<String,Object> props = new HashMap<String,Object>(); - context.connectionStartOk(props, mechanism, response, _locale); - } - - @Override public void connectionSecure(Channel context, ConnectionSecure struct) - { - try - { - byte[] response = saslClient.evaluateChallenge(struct.getChallenge()); - context.connectionSecureOk(response); - } - catch (SaslException e) - { - // need error handling - } - } - - @Override public void connectionTune(Channel context, ConnectionTune struct) - { - context.getConnection().setChannelMax(struct.getChannelMax()); - context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax()); - context.connectionOpen(_virtualHost, null, Option.INSIST); - } - - - @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) - { - List<Object> knownHosts = struct.getKnownHosts(); - if(_negotiationCompleteLock != null) - { - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally - { - _negotiationCompleteLock.unlock(); - } - } - } - - public void connectionRedirect(Channel context, ConnectionRedirect struct) - { - // not going to bother at the moment - } - - // ---------------------------------------------- - // Server side - //----------------------------------------------- - @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + public SessionDelegate getSessionDelegate() { - //set the client side locale on the server side - _locale = struct.getLocale(); - _mechanism = struct.getMechanism(); - - //try - //{ - //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); - byte[] challenge = null; - if ( challenge == null) - { - context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); - } - else - { - try - { - context.connectionSecure(challenge); - } - catch(Exception e) - { - - } - } - - - /*} - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - }*/ + return new SessionDelegate(); } - @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) - { - try - { - saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse()); - if ( challenge == null) - { - context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); - } - else - { - try - { - context.connectionSecure(challenge); - } - catch(Exception e) - { - - } - } - - - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - } - - - @Override public void connectionOpen(Channel context, ConnectionOpen struct) - { - List<Object> hosts = new ArrayList<Object>(); - hosts.add("amqp:1223243232325"); - context.connectionOpenOk(hosts); - } + public abstract void init(Channel ch, ProtocolHeader hdr); @Override public void connectionClose(Channel ch, ConnectionClose close) { - ch.getConnection().closeCode(close); + Connection conn = ch.getConnection(); ch.connectionCloseOk(); + conn.getSender().close(); + conn.closeCode(close); + conn.setState(CLOSE_RCVD); } - public String getPassword() - { - return _password; - } - - public void setPassword(String password) - { - _password = password; - } - - public String getUsername() - { - return _username; - } - - public void setUsername(String username) - { - _username = username; - } - - public String getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(String host) + @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok) { - _virtualHost = host; + Connection conn = ch.getConnection(); + conn.getSender().close(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java index c3239ef684..1bd7d516cf 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java @@ -26,17 +26,37 @@ package org.apache.qpid.transport; * */ -public class ConnectionException extends RuntimeException +public class ConnectionException extends TransportException { private ConnectionClose close; - public ConnectionException(ConnectionClose close) + public ConnectionException(String message, ConnectionClose close, Throwable cause) { - super(close.getReplyText()); + super(message, cause); this.close = close; } + public ConnectionException(String message) + { + this(message, null, null); + } + + public ConnectionException(String message, Throwable cause) + { + this(message, null, cause); + } + + public ConnectionException(Throwable cause) + { + this(cause.getMessage(), null, cause); + } + + public ConnectionException(ConnectionClose close) + { + this(close.getReplyText(), close, null); + } + public ConnectionClose getClose() { return close; diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java index 4cf0cab1ec..616e76825a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,25 +16,23 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ -package org.apache.qpid.nclient; - -import org.apache.qpid.ErrorCode; +package org.apache.qpid.transport; /** - * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it - * informs the connection's ExceptionListener + * ConnectionListener + * */ -public interface ClosedListener + +public interface ConnectionListener { - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * @param errorCode TODO - * @param reason TODO - * @param t TODO - * @see Connection - */ - public void onClosed(ErrorCode errorCode, String reason, Throwable t); -}
\ No newline at end of file + + void opened(Connection connection); + + void exception(Connection connection, ConnectionException exception); + + void closed(Connection connection); + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index b2be32331a..89b59c2512 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -43,25 +43,16 @@ public class Echo extends SessionDelegate public static final void main(String[] args) throws IOException { - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new Echo(); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - IoAcceptor ioa = new IoAcceptor - ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ("0.0.0.0", 5672, ConnectionBinding.get(delegate)); ioa.start(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java index 2de0c169a5..0cca0227a1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java @@ -26,7 +26,7 @@ package org.apache.qpid.transport; * */ -public final class ProtocolVersionException extends TransportException +public final class ProtocolVersionException extends ConnectionException { private final byte major; diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java new file mode 100644 index 0000000000..c27419cadc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.util.Collections; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import java.io.UnsupportedEncodingException; + +import org.apache.qpid.QpidException; + +import org.apache.qpid.SecurityHelper; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + + +import static org.apache.qpid.transport.Connection.State.*; + + +/** + * ServerDelegate + * + */ + +public class ServerDelegate extends ConnectionDelegate +{ + + private SaslServer saslServer; + + public void init(Channel ch, ProtocolHeader hdr) + { + Connection conn = ch.getConnection(); + conn.send(new ProtocolHeader(1, 0, 10)); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, Collections.EMPTY_LIST, utf8); + } + + @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok) + { + Connection conn = ch.getConnection(); + conn.setLocale(ok.getLocale()); + String mechanism = ok.getMechanism(); + + if (mechanism == null || mechanism.length() == 0) + { + ch.connectionTune + (Integer.MAX_VALUE, + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, Integer.MAX_VALUE); + return; + } + + try + { + SaslServer ss = Sasl.createSaslServer + (mechanism, "AMQP", "localhost", null, null); + if (ss == null) + { + ch.connectionClose + (ConnectionCloseCode.CONNECTION_FORCED, + "null SASL mechanism: " + mechanism); + return; + } + conn.setSaslServer(ss); + secure(ch, ok.getResponse()); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + private void secure(Channel ch, byte[] response) + { + Connection conn = ch.getConnection(); + SaslServer ss = conn.getSaslServer(); + try + { + byte[] challenge = ss.evaluateResponse(response); + if (ss.isComplete()) + { + ss.dispose(); + ch.connectionTune + (Integer.MAX_VALUE, + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, Integer.MAX_VALUE); + } + else + { + ch.connectionSecure(challenge); + } + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok) + { + secure(ch, ok.getResponse()); + } + + @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok) + { + + } + + @Override public void connectionOpen(Channel ch, ConnectionOpen open) + { + Connection conn = ch.getConnection(); + ch.connectionOpenOk(Collections.EMPTY_LIST); + conn.setState(OPEN); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 10ca6cfb0a..df4313e15b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -49,6 +49,26 @@ public class Session extends Invoker private static final Logger log = Logger.get(Session.class); + class DefaultSessionListener implements SessionListener + { + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + log.info("message: %s", xfr); + } + + public void exception(Session ssn, SessionException exc) + { + throw exc; + } + + public void closed(Session ssn) {} + } + + public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; + private static boolean ENABLE_REPLAY = false; static @@ -65,6 +85,7 @@ public class Session extends Invoker } private byte[] name; + private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; @@ -97,6 +118,23 @@ public class Session extends Invoker return name; } + public void setSessionListener(SessionListener listener) + { + if (listener == null) + { + this.listener = new DefaultSessionListener(); + } + else + { + this.listener = listener; + } + } + + public SessionListener getSessionListener() + { + return listener; + } + public void setAutoSync(boolean value) { synchronized (commands) @@ -270,8 +308,8 @@ public class Session extends Invoker { if (closed.get()) { - List<ExecutionException> exc = getExceptions(); - if (!exc.isEmpty()) + ExecutionException exc = getException(); + if (exc != null) { throw new SessionException(exc); } @@ -361,7 +399,7 @@ public class Session extends Invoker { if (closed.get()) { - throw new SessionException(getExceptions()); + throw new SessionException(getException()); } else { @@ -375,8 +413,7 @@ public class Session extends Invoker private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>(); - private List<ExecutionException> exceptions = - new ArrayList<ExecutionException>(); + private ExecutionException exception = null; void result(int command, Struct result) { @@ -388,11 +425,17 @@ public class Session extends Invoker future.set(result); } - void addException(ExecutionException exc) + void setException(ExecutionException exc) { - synchronized (exceptions) + synchronized (results) { - exceptions.add(exc); + if (exception != null) + { + throw new IllegalStateException + (String.format + ("too many exceptions: %s, %s", exception, exc)); + } + exception = exc; } } @@ -403,11 +446,11 @@ public class Session extends Invoker this.close = close; } - List<ExecutionException> getExceptions() + ExecutionException getException() { - synchronized (exceptions) + synchronized (results) { - return new ArrayList<ExecutionException>(exceptions); + return exception; } } @@ -473,7 +516,7 @@ public class Session extends Invoker } else if (closed.get()) { - throw new SessionException(getExceptions()); + throw new SessionException(getException()); } else { diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java index d2c54cf339..354e5c1d15 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -33,7 +33,7 @@ public class SessionClosedException extends SessionException public SessionClosedException() { - super(Collections.EMPTY_LIST); + super(null); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index b91763509c..3e6fa9d5d9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -29,7 +29,7 @@ import org.apache.qpid.transport.network.Frame; * @author Rafael H. Schloming */ -public abstract class SessionDelegate +public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { @@ -57,7 +57,7 @@ public abstract class SessionDelegate @Override public void executionException(Session ssn, ExecutionException exc) { - ssn.addException(exc); + ssn.setException(exc); } @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) @@ -122,4 +122,9 @@ public abstract class SessionDelegate ssn.syncPoint(); } + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ssn.getSessionListener().message(ssn, xfr); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java index dc294b2206..ae9b4b9cdb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java @@ -27,20 +27,20 @@ import java.util.List; * */ -public class SessionException extends RuntimeException +public class SessionException extends TransportException { - private List<ExecutionException> exceptions; + private ExecutionException exception; - public SessionException(List<ExecutionException> exceptions) + public SessionException(ExecutionException exception) { - super(exceptions.isEmpty() ? "" : exceptions.toString()); - this.exceptions = exceptions; + super(String.valueOf(exception)); + this.exception = exception; } - public List<ExecutionException> getExceptions() + public ExecutionException getException() { - return exceptions; + return exception; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java new file mode 100644 index 0000000000..63690177f9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SessionListener + * + */ + +public interface SessionListener +{ + + void opened(Session session); + + void message(Session ssn, MessageTransfer xfr); + + void exception(Session session, SessionException exception); + + void closed(Session session); + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java index 8653acedbe..617867cae6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -103,28 +103,17 @@ public class Sink extends SessionDelegate public static final void main(String[] args) throws IOException { - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new Sink(); } - - public void exception(Throwable t) - { - t.printStackTrace(); - } - - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - IoAcceptor ioa = new IoAcceptor - ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ("0.0.0.0", 5672, ConnectionBinding.get(delegate)); System.out.println (String.format (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate")); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 6886cb3a5a..8a2aba2e6d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -33,23 +33,46 @@ import org.apache.qpid.transport.Sender; * */ -public class ConnectionBinding implements Binding<Connection,ByteBuffer> +public abstract class ConnectionBinding + implements Binding<Connection,ByteBuffer> { - private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private final ConnectionDelegate delegate; + public static Binding<Connection,ByteBuffer> get(final Connection connection) + { + return new ConnectionBinding() + { + public Connection connection() + { + return connection; + } + }; + } - public ConnectionBinding(ConnectionDelegate delegate) + public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) { - this.delegate = delegate; + return new ConnectionBinding() + { + public Connection connection() + { + Connection conn = new Connection(); + conn.setConnectionDelegate(delegate); + return conn; + } + }; } + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + public abstract Connection connection(); + public Connection endpoint(Sender<ByteBuffer> sender) { + Connection conn = connection(); + // XXX: hardcoded max-frame - return new Connection - (new Disassembler(sender, MAX_FRAME_SIZE), delegate); + Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE); + conn.setSender(dis); + return conn; } public Receiver<ByteBuffer> receiver(Connection conn) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index a1fb0371fd..5efd51d5db 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -61,7 +61,7 @@ final class IoReceiver extends Thread start(); } - void close() + void close(boolean block) { if (!closed.getAndSet(true)) { @@ -75,7 +75,7 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (Thread.currentThread() != this) + if (block && Thread.currentThread() != this) { join(timeout); if (isAlive()) @@ -121,6 +121,7 @@ final class IoReceiver extends Thread } } } + socket.close(); } catch (Throwable t) { @@ -129,7 +130,6 @@ final class IoReceiver extends Thread finally { receiver.closed(); - transport.getSender().close(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index ef892744ab..f70a13ec3c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -196,17 +196,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException("join timed out"); } } - transport.getReceiver().close(); - socket.close(); + transport.getReceiver().close(false); } catch (InterruptedException e) { throw new TransportException(e); } - catch (IOException e) - { - throw new TransportException(e); - } if (reportException && exception != null) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 70fd8a3c06..be17766740 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -108,7 +108,7 @@ public final class IoTransport<E> public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding(delegate)); + return connect(host, port, ConnectionBinding.get(delegate)); } public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index f8dbec3c3d..b89eed48b0 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -262,13 +262,13 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) throws IOException { - accept(host, port, new ConnectionBinding(delegate)); + accept(host, port, ConnectionBinding.get(delegate)); } public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding(delegate)); + return connect(host, port, ConnectionBinding.get(delegate)); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index 318fe0d03b..3bc6730623 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -87,8 +87,9 @@ public class NioHandler implements Runnable } NioSender sender = new NioSender(_ch); - Connection con = new Connection - (new Disassembler(sender, 64*1024 - 1), delegate); + Connection con = new Connection(); + con.setSender(new Disassembler(sender, 64*1024 - 1)); + con.setConnectionDelegate(delegate); con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java new file mode 100644 index 0000000000..e034d779ca --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.util; + + +/** + * Waiter + * + */ + +public final class Waiter +{ + + private final Object lock; + private final long timeout; + private final long start; + private long elapsed; + + public Waiter(Object lock, long timeout) + { + this.lock = lock; + this.timeout = timeout; + this.start = System.currentTimeMillis(); + this.elapsed = 0; + } + + public boolean hasTime() + { + return elapsed < timeout; + } + + public void await() + { + try + { + lock.wait(timeout - elapsed); + } + catch (InterruptedException e) + { + // pass + } + elapsed = System.currentTimeMillis() - start; + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index b9ca210483..e61ffb501b 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -50,38 +50,30 @@ public class ConnectionTest extends TestCase port = AvailablePortFinder.getNextAvailable(12000); - ConnectionDelegate server = new ConnectionDelegate() { - public void init(Channel ch, ProtocolHeader hdr) { + ConnectionDelegate server = new ServerDelegate() { + @Override public void connectionOpen(Channel ch, ConnectionOpen open) + { + super.connectionOpen(ch, open); ch.getConnection().close(); } - - public SessionDelegate getSessionDelegate() { - return new SessionDelegate() {}; - } - public void exception(Throwable t) { - log.error(t, "exception caught"); - } - public void closed() {} }; IoAcceptor ioa = new IoAcceptor - ("localhost", port, new ConnectionBinding(server)); + ("localhost", port, ConnectionBinding.get(server)); ioa.start(); } private Connection connect(final Condition closed) { - Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate() + Connection conn = new Connection(); + conn.setConnectionListener(new ConnectionListener() { - public SessionDelegate getSessionDelegate() + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException exc) { - return new SessionDelegate() {}; + exc.printStackTrace(); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() + public void closed(Connection conn) { if (closed != null) { @@ -89,8 +81,7 @@ public class ConnectionTest extends TestCase } } }); - - conn.send(new ProtocolHeader(1, 0, 10)); + conn.connect("localhost", port, null, "guest", "guest"); return conn; } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index f4428cb1e2..2c36fb3d65 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,140 +28,95 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.DtxSession; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.util.Logger; /** * Qpid datasource. - * Basically it is a connection pool manager used for optimizing broker connections usage. - * + * Basically it is a connection pool manager used for optimizing broker connections usage. + * * @author Andrea Gazzarini */ -public final class QpidDatasource +public final class QpidDatasource { private final static Logger LOGGER = Logger.get(QpidDatasource.class); - + /** * A connection decorator used for adding pool interaction behaviour to an existing connection. - * + * * @author Andrea Gazzarini */ - public class ConnectionDecorator implements Connection,ClosedListener + class PooledConnection extends Connection { - private final Connection _decoratee; private final UUID _brokerId; private boolean _valid; - + /** * Builds a new decorator with the given connection. - * + * * @param brokerId the broker identifier. * @param decoratee the underlying connection. */ - private ConnectionDecorator(UUID brokerId, Connection decoratee) + private PooledConnection(UUID brokerId) { - this._decoratee = decoratee; this._brokerId = brokerId; - _decoratee.setClosedListener(this); _valid = true; } - + /** * Returns true if the underlying connection is still valid and can be used. - * + * * @return true if the underlying connection is still valid and can be used. */ boolean isValid() { return _valid; } - + + void reallyClose() + { + super.close(); + } + /** * Returns the connection to the pool. That is, marks this connections as available. * After that, this connection will be available for further operations. */ - public void close () throws QpidException + public void close() { try { pools.get(_brokerId).returnObject(this); LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this); - } catch (Exception exception) + } + catch (Exception e) { - throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception); - } - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String host, int port, String virtualHost, String username, String password) - throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String url) throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * @see Connection#createDTXSession(int) - */ - public DtxSession createDTXSession (int expiryInSeconds) - { - return _decoratee.createDTXSession(expiryInSeconds); + throw new ConnectionException(e); + } } - /** - * @see Connection#createSession(long) - */ - public Session createSession (long expiryInSeconds) + public void exception(Throwable t) { - return _decoratee.createSession(expiryInSeconds); + super.exception(t); + _valid = false; } + } - /** - * Do nothing : closed listener has been already injected. - */ - public void setClosedListener (ClosedListener exceptionListner) - { - } - - /** - * Callback method used for error notifications while underlying connection is closing. - */ - public void onClosed (ErrorCode errorCode, String reason, Throwable t) - { - _valid = false; - LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode()); - } - }; - /** - * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of + * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of * the broker connection(s). - * + * * @author Andrea Gazzarini */ class QpidConnectionFactory extends BasePoolableObjectFactory - { + { private final BrokerConnectionData _connectionData; private final UUID _brokerId; - + /** * Builds a new connection factory with the given parameters. - * + * * @param brokerId the broker identifier. * @param connectionData the connecton data. */ @@ -170,35 +125,35 @@ public final class QpidDatasource this._connectionData = connectionData; this._brokerId = brokerId; } - + /** * Creates a new underlying connection. */ @Override public Connection makeObject () throws Exception { - Connection connection = Client.createConnection(); + PooledConnection connection = new PooledConnection(_brokerId); connection.connect( - _connectionData.getHost(), - _connectionData.getPort(), - _connectionData.getVirtualHost(), - _connectionData.getUsername(), + _connectionData.getHost(), + _connectionData.getPort(), + _connectionData.getVirtualHost(), + _connectionData.getUsername(), _connectionData.getPassword()); - return new ConnectionDecorator(_brokerId,connection); + return connection; } - + /** * Validates the underlying connection. */ @Override public boolean validateObject (Object obj) { - ConnectionDecorator connection = (ConnectionDecorator) obj; + PooledConnection connection = (PooledConnection) obj; boolean isValid = connection.isValid(); LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid); return isValid; } - + /** * Closes the underlying connection. */ @@ -207,8 +162,8 @@ public final class QpidDatasource { try { - ConnectionDecorator connection = (ConnectionDecorator) obj; - connection._decoratee.close(); + PooledConnection connection = (PooledConnection) obj; + connection.reallyClose(); LOGGER.debug("<QMAN-200014> : Connection has been destroyed."); } catch (Exception e) { @@ -216,21 +171,21 @@ public final class QpidDatasource } } } - + // Singleton instance. private static QpidDatasource instance = new QpidDatasource(); // Each entry contains a connection pool for a specific broker. private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>(); - + // Private constructor. private QpidDatasource() { } - + /** * Gets an available connection from the pool of the given broker. - * + * * @param brokerId the broker identifier. * @return a valid connection to the broker associated with the given identifier. */ @@ -238,20 +193,20 @@ public final class QpidDatasource { return (Connection) pools.get(brokerId).borrowObject(); } - + /** * Entry point method for retrieving the singleton instance of this datasource. - * + * * @return the qpid datasource singleton instance. */ - public static QpidDatasource getInstance() + public static QpidDatasource getInstance() { return instance; } - + /** * Adds a connection pool to this datasource. - * + * * @param brokerId the broker identifier that will be associated with the new connection pool. * @param connectionData the broker connection data. * @throws Exception when the pool cannot be created. @@ -265,12 +220,12 @@ public final class QpidDatasource true, false); ObjectPool pool = factory.createPool(); - + for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++) { pool.returnObject(pool.borrowObject()); } - + pools.put(brokerId,pool); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index 92689eba52..4bda450315 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -22,42 +22,47 @@ package org.apache.qpid.management.domain.services; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; import org.apache.qpid.management.Constants; import org.apache.qpid.management.Names; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.util.Logger; /** * Qpid Broker facade. - * + * * @author Andrea Gazzarini */ -public class QpidService +public class QpidService implements SessionListener { private final static Logger LOGGER = Logger.get(QpidService.class); // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. - private static class Log - { + private static class Log + { /** * Logs the content f the message. * This will be written on log only if DEBUG level is enabled. - * + * * @param messageContent the raw content of the message. */ - static void logMessageContent(byte [] messageContent) + static void logMessageContent(byte [] messageContent) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -65,56 +70,81 @@ public class QpidService Arrays.toString(messageContent)); } } - + /** * Logs the content f the message. * This will be written on log only if DEBUG level is enabled. - * + * * @param messageContent the raw content of the message. */ - static void logMessageContent(ByteBuffer messageContent) + static void logMessageContent(ByteBuffer messageContent) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( "<QMAN-200002> : Message has been sent to management exchange."); } - } + } } - + private UUID _brokerId; private Connection _connection; private Session _session; - + private Map<String,MessagePartListenerAdapter> _listeners; + /** * Builds a new service with the given connection data. - * + * * @param connectionData the connection data of the broker. */ - public QpidService(UUID brokerId) + public QpidService(UUID brokerId) { this._brokerId = brokerId; } - + /** * Estabilishes a connection with the broker. - * + * * @throws QpidException in case of connection failure. */ public void connect() throws Exception { _connection = QpidDatasource.getInstance().getConnection(_brokerId); + _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>(); _session = _connection.createSession(Constants.NO_EXPIRATION); + _session.setSessionListener(this); + } + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); + if (l == null) + { + LOGGER.error("unhandled message: %s", xfr); + } + else + { + l.messageTransfer(xfr); + } } - + + public void exception(Session ssn, SessionException exc) + { + LOGGER.error(exc, "session %s exception", ssn); + } + + public void closed(Session ssn) {} + /** - * All the previously entered outstanding commands are asynchronous. + * All the previously entered outstanding commands are asynchronous. * Synchronous behavior is achieved through invoking this method. */ - public void sync() + public void sync() { _session.sync(); } - + /** * Closes communication with broker. */ @@ -124,48 +154,50 @@ public class QpidService { _session.close(); _session = null; + _listeners = null; } catch (Exception e) { } try { - _connection.close(); + _connection.close(); _connection = null; } catch (Exception e) { } } - + /** * Associate a message listener with a destination therefore creating a new subscription. - * + * * @param queueName The name of the queue that the subscriber is receiving messages from. * @param destinationName the name of the destination, or delivery tag, for the subscriber. - * @param listener the listener for this destination. - * + * @param listener the listener for this destination. + * * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...) */ - public void createSubscription(String queueName, String destinationName,MessageListener listener) + public void createSubscription(String queueName, String destinationName, MessageListener listener) { - _session.messageSubscribe( - queueName, - destinationName, - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); - - _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE); - + _listeners.put(destinationName, new MessagePartListenerAdapter(listener)); + _session.messageSubscribe + (queueName, + destinationName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); + LOGGER.debug( - "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", + "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", queueName, destinationName); } - + /** * Removes a previously declared consumer from the broker. - * + * * @param destinationName the name of the destination, or delivery tag, for the subscriber. * @see Session#messageCancel(String, Option...) */ @@ -173,10 +205,10 @@ public class QpidService { _session.messageCancel(destinationName); LOGGER.debug( - "<QMAN-200026> : Subscription named %s has been removed from remote broker.", + "<QMAN-200026> : Subscription named %s has been removed from remote broker.", destinationName); - } - + } + /** * Declares a queue on the broker with the given name. * @@ -200,27 +232,27 @@ public class QpidService _session.queueDelete(queueName); LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName); } - + /** * Binds (on the broker) a queue with an exchange. * - * @param queueName the name of the queue to bind. + * @param queueName the name of the queue to bind. * @param exchangeName the exchange name. - * @param routingKey the routing key used for the binding. + * @param routingKey the routing key used for the binding. * @see Session#exchangeBind(String, String, String, java.util.Map, Option...) */ public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); LOGGER.debug( - "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", + "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Removes a previously declare binding between an exchange and a queue. - * + * * @param queueName the name of the queue. * @param exchangeName the name of the exchange. * @param routingKey the routing key used for binding. @@ -229,42 +261,42 @@ public class QpidService { _session.exchangeUnbind(queueName, exchangeName, routingKey); LOGGER.debug( - "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", + "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Sends a command message with the given data on the management queue. - * + * * @param messageData the command message content. */ - public void sendCommandMessage(byte [] messageData) + public void sendCommandMessage(byte [] messageData) { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, Configuration.getInstance().getCommandMessageHeader(), - messageData); - + messageData); + Log.logMessageContent (messageData); } - + /** * Sends a command message with the given data on the management queue. - * + * * @param messageData the command message content. */ - public void sendCommandMessage(ByteBuffer messageData) + public void sendCommandMessage(ByteBuffer messageData) { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, Configuration.getInstance().getCommandMessageHeader(), - messageData); - + messageData); + Log.logMessageContent (messageData); - } + } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 181cf427d1..82e05ba816 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -640,71 +640,47 @@ public class QpidBench } private static final org.apache.qpid.transport.Connection getConnection - (Options opts, final SessionDelegate delegate) + (Options opts) { - final Object lock = new Object(); org.apache.qpid.transport.Connection conn = - IoTransport.connect(opts.broker, opts.port, - new ClientDelegate() - { - public SessionDelegate getSessionDelegate() - { - return delegate; - } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} - @Override public void connectionOpenOk(Channel ch, - ConnectionOpenOk ok) - { - synchronized (lock) - { - lock.notify(); - } - } - }); - conn.send(new ProtocolHeader(1, 0, 10)); - - synchronized (lock) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest"); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); } - return conn; + public void closed(org.apache.qpid.transport.Session ssn) {} + } private static final void native_publisher(Options opts) throws Exception { final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(mt); - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -794,6 +770,7 @@ public class QpidBench ssn.messageCancel("echo-queue"); ssn.sync(); + ssn.close(); conn.close(); } @@ -805,57 +782,51 @@ public class QpidBench dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); final MessageProperties mp = new MessageProperties(); final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - - private long count = 0; - private long lastTime = 0; - private long start; - - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(mt); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -879,6 +850,7 @@ public class QpidBench ssn.messageCancel("test-queue"); ssn.sync(); + ssn.close(); conn.close(); } |