summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commit394823bba7976c170ac58e53b5d80ad12e0f1690 (patch)
tree9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java
parente78747f63bc73daa6e2035453358e6eaf3237b84 (diff)
downloadqpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java64
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java91
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java29
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java85
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java106
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java98
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Client.java295
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Connection.java86
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java137
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Session.java544
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java163
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java68
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java66
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java54
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyBroker.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyClient.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Channel.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java106
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java287
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java242
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java (renamed from java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java)31
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Echo.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java145
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java67
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionException.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionListener.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sink.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java63
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java33
-rw-r--r--java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java157
-rw-r--r--java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java156
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java186
55 files changed, 1336 insertions, 2446 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
index b7982f8e78..aa99112f32 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
@@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.direct;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
/**
* This creates a queue a queue and binds it to the
@@ -36,16 +35,8 @@ public class DeclareQueue
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -58,15 +49,7 @@ public class DeclareQueue
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
index ba4aec4024..1e571eeede 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
@@ -24,63 +24,21 @@ package org.apache.qpid.example.amqpexample.direct;
import java.nio.ByteBuffer;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
-public class DirectProducer implements MessageListener
+public class DirectProducer
{
- boolean finish = false;
-
- public void onMessage(Message m)
- {
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
- }
-
- public boolean isFinished()
- {
- return finish;
- }
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -102,16 +60,8 @@ public class DirectProducer implements MessageListener
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
index 7fed35872d..370573c3eb 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
@@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.direct;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
/**
* This listens to messages on a queue and terminates
* when it sees the final message
*
*/
-public class Listener implements MessageListener
+public class Listener implements SessionListener
{
- boolean finish = false;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
+ System.out.println("Message: " + xfr);
}
- public boolean isFinished()
+ public void exception(Session ssn, SessionException exc)
{
- return finish;
+ exc.printStackTrace();
}
+ public void closed(Session ssn) {}
+
/**
* This sends 10 messages to the
* amq.direct exchange using the
* routing key as "routing_key"
*
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
Listener listener = new Listener();
+ session.setSessionListener(listener);
// create a subscription
session.messageSubscribe("message_queue",
"listener_destination",
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX
- session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11);
// confirm completion
session.sync();
- // check to see if we have received all the messages
- while (!listener.isFinished()){}
+ // wait to receive all the messages
+ System.out.println("Waiting 100 seconds for messages from listener_destination");
+ Thread.sleep(100*1000);
System.out.println("Shutting down listener for listener_destination");
session.messageCancel("listener_destination");
//cleanup
- session.sessionDetach(session.getName());
-
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
index 75f274bcbb..079be003b1 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java
@@ -21,9 +21,8 @@ package org.apache.qpid.example.amqpexample.fanout;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
/**
* This creates a queue a queue and binds it to the
@@ -36,16 +35,8 @@ public class DeclareQueue
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -58,15 +49,7 @@ public class DeclareQueue
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
index a1ec0b7f29..257bcdbfb1 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
@@ -21,13 +21,12 @@ package org.apache.qpid.example.amqpexample.fanout;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
public class FannoutProducer
{
@@ -38,16 +37,8 @@ public class FannoutProducer
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -68,16 +59,8 @@ public class FannoutProducer
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
index 432dd7eb4f..dead5569eb 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
@@ -23,114 +23,81 @@ package org.apache.qpid.example.amqpexample.fanout;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
/**
* This listens to messages on a queue and terminates
* when it sees the final message
*
*/
-public class Listener implements MessageListener
+public class Listener implements SessionListener
{
- boolean finish = false;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data);
-
- if (data != null && data.equals("That's all, folks!"))
- {
- finish = true;
- }
+ System.out.println("Message: " + xfr);
}
- public boolean isFinished()
+ public void exception(Session ssn, SessionException exc)
{
- return finish;
+ exc.printStackTrace();
}
+ public void closed(Session ssn) {}
+
/**
* This sends 10 messages to the
* amq.direct exchange using the
* routing key as "routing_key"
*
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
Listener listener = new Listener();
+ session.setSessionListener(listener);
// create a subscription
session.messageSubscribe("message_queue",
"listener_destination",
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX
- session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11);
// confirm completion
session.sync();
// check to see if we have received all the messages
- while (!listener.isFinished()){}
+ System.out.println("Waiting 100 seconds for messages from listener_destination");
+ Thread.sleep(100*1000);
System.out.println("Shutting down listener for listener_destination");
session.messageCancel("listener_destination");
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
index 70c08e0d02..2ed5b2d719 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
@@ -23,63 +23,49 @@ package org.apache.qpid.example.amqpexample.pubsub;
import java.nio.ByteBuffer;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
-public class TopicListener implements MessageListener
+public class TopicListener implements SessionListener
{
- boolean finish = false;
- int count = 0;
- public void onMessage(Message m)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- String data = null;
-
- try
- {
- ByteBuffer buf = m.readData();
- byte[] b = new byte[buf.remaining()];
- buf.get(b);
- data = new String(b);
- }
- catch(Exception e)
- {
- System.out.print("Error reading message");
- e.printStackTrace();
- }
-
- System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey());
-
- if (data != null && data.equals("That's all, folks!"))
- {
- count++;
- if (count == 4){
- finish = true;
- }
- }
+ DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class);
+ System.out.println("Message: " + xfr + " with routing_key " + dp.getRoutingKey());
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public void prepareQueue(Session session,String queueName,String bindingKey)
{
session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
session.exchangeBind(queueName, "amq.topic", bindingKey, null);
session.exchangeBind(queueName, "amq.topic", "control", null);
- session.messageSubscribe(queueName,queueName,
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(this),
- null, Option.NONE);
+ session.messageSubscribe(queueName, queueName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// issue credits
// XXX: need to be able to set to null
- session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24);
}
@@ -88,30 +74,18 @@ public class TopicListener implements MessageListener
session.messageCancel(dest);
}
- public boolean isFinished()
- {
- return finish;
- }
-
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
TopicListener listener = new TopicListener();
+ session.setSessionListener(listener);
listener.prepareQueue(session,"usa", "usa.#");
listener.prepareQueue(session,"europe", "europe.#");
@@ -120,25 +94,19 @@ public class TopicListener implements MessageListener
// confirm completion
session.sync();
- // check to see if we have received all the messages
- while (!listener.isFinished()){}
- System.out.println("Shutting down listener for listener_destination");
+
+ System.out.println("Waiting 100 seconds for messages");
+ Thread.sleep(100*1000);
+
+ System.out.println("Shutting down listeners");
listener.cancelSubscription(session,"usa");
listener.cancelSubscription(session,"europe");
listener.cancelSubscription(session,"news");
listener.cancelSubscription(session,"weather");
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
index 3067309555..20264d3791 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
@@ -21,16 +21,16 @@ package org.apache.qpid.example.amqpexample.pubsub;
*/
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Session;
public class TopicPublisher
{
+
public void publishMessages(Session session, String routing_key)
{
// Set the routing key once, we'll use the same routing key for all
@@ -56,16 +56,8 @@ public class TopicPublisher
public static void main(String[] args)
{
// Create connection
- Connection con = Client.createConnection();
- try
- {
- con.connect("localhost", 5672, "test", "guest", "guest");
- }
- catch(Exception e)
- {
- System.out.print("Error connecting to broker");
- e.printStackTrace();
- }
+ Connection con = new Connection();
+ con.connect("localhost", 5672, "test", "guest", "guest");
// Create session
Session session = con.createSession(0);
@@ -82,15 +74,7 @@ public class TopicPublisher
session.sync();
//cleanup
- session.sessionDetach(session.getName());
- try
- {
- con.close();
- }
- catch(Exception e)
- {
- System.out.print("Error closing broker connection");
- e.printStackTrace();
- }
+ session.close();
+ con.close();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 27294562e5..ebeb29af78 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -917,7 +917,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
+ _delegate.closeConnection(timeout);
//If the taskpool hasn't shutdown by now then give it shutdownNow.
// This will interupt any running tasks.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 7f36ec6e99..60b827a426 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -42,5 +42,5 @@ public interface AMQConnectionDelegate
public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
- public void closeConneciton(long timeout) throws JMSException, AMQException;
+ public void closeConnection(long timeout) throws JMSException, AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index c723709d27..a7f04a2090 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client;
import java.io.IOException;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
@@ -33,15 +34,18 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.TransportException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
/**
* This class logger.
@@ -56,7 +60,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
/**
* The QpidConeection instance that is mapped with thie JMS connection.
*/
- org.apache.qpid.nclient.Connection _qpidConnection;
+ org.apache.qpid.transport.Connection _qpidConnection;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
@@ -125,7 +129,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
*/
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _qpidConnection = Client.createConnection();
+ _qpidConnection = new Connection();
try
{
if (_logger.isDebugEnabled())
@@ -134,16 +138,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
.getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn
.getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword());
}
+ _qpidConnection.setConnectionListener(this);
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
- _qpidConnection.setClosedListener(this);
_conn._connected = true;
}
catch(ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
- catch (QpidException e)
+ catch (ConnectionException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
@@ -161,34 +165,42 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
}
- public void closeConneciton(long timeout) throws JMSException, AMQException
+ public void closeConnection(long timeout) throws JMSException, AMQException
{
try
{
_qpidConnection.close();
}
- catch (QpidException e)
+ catch (TransportException e)
{
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
+ throw new AMQException(e.getMessage(), e);
}
-
}
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ public void opened(Connection conn) {}
+
+ public void exception(Connection conn, ConnectionException exc)
{
- if (_logger.isDebugEnabled())
+ ExceptionListener listener = _conn._exceptionListener;
+ if (listener == null)
{
- _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t);
+ _logger.error("connection exception: " + conn, exc);
}
- if (_conn._exceptionListener != null)
+ else
{
- JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
- if (t != null)
+ ConnectionClose close = exc.getClose();
+ String code = null;
+ if (close != null)
{
- ex.initCause(t);
+ code = close.getReplyCode().toString();
}
+ JMSException ex = new JMSException(exc.getMessage(), code);
+ ex.initCause(exc);
_conn._exceptionListener.onException(ex);
}
}
+
+ public void closed(Connection conn) {}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 2ec8737d16..8d42a2f201 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -58,7 +58,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private AMQConnection _conn;
- public void closeConneciton(long timeout) throws JMSException, AMQException
+ public void closeConnection(long timeout) throws JMSException, AMQException
{
_conn.getProtocolHandler().closeConnection(timeout);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 46a667419d..7829966315 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -28,20 +28,27 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.util.Serial;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.qpid.transport.Option.*;
+
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -53,6 +60,7 @@ import java.util.Map;
* This is a 0.10 Session
*/
public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
+ implements SessionListener
{
/**
@@ -70,10 +78,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* The latest qpid Exception that has been reaised.
*/
private Object _currentExceptionLock = new Object();
- private QpidException _currentException;
+ private SessionException _currentException;
// a ref on the qpid connection
- protected org.apache.qpid.nclient.Connection _qpidConnection;
+ protected org.apache.qpid.transport.Connection _qpidConnection;
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
@@ -97,7 +105,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
* @param qpidConnection The qpid connection
*/
- AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -108,7 +116,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
- _qpidSession.setClosedListener(new QpidSessionExceptionListener());
+ _qpidSession.setSessionListener(this);
// set transacted if required
if (_transacted)
{
@@ -127,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
* @param qpidConnection The connection
*/
- AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -195,12 +203,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (unackedCount > 0)
{
- getQpidSession().messageAcknowledge
+ messageAcknowledge
(unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
clearUnacked();
}
}
+ void messageAcknowledge(RangeSet ranges, boolean accept)
+ {
+ Session ssn = getQpidSession();
+ for (Range range : ranges)
+ {
+ ssn.processed(range);
+ }
+ ssn.flushProcessed(accept ? BATCH : NONE);
+ if (accept)
+ {
+ ssn.messageAccept(ranges);
+ }
+ }
+
/**
* Bind a queue with an exchange.
*
@@ -416,11 +438,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
- getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- (BasicMessageConsumer_0_10) consumer, null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ getQpidSession().messageSubscribe
+ (queueName.toString(), String.valueOf(tag),
+ getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
@@ -598,7 +620,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.Session getQpidSession()
+ protected Session getQpidSession()
{
return _qpidSession;
}
@@ -615,31 +637,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_currentException != null)
{
- QpidException toBeThrown = _currentException;
+ SessionException se = _currentException;
_currentException = null;
- throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()),
- toBeThrown.getMessage(), toBeThrown);
+ ExecutionException ee = se.getException();
+ int code;
+ if (ee == null)
+ {
+ code = 0;
+ }
+ else
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ throw new AMQException
+ (AMQConstant.getConstant(code), se.getMessage(), se);
}
}
}
- //------ Inner classes
- /**
- * Lstener for qpid protocol exceptions
- */
- private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ messageReceived(new UnprocessedMessage_0_10(xfr));
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ synchronized (_currentExceptionLock)
{
- synchronized (_currentExceptionLock)
- {
- // todo check the error code for finding out if we need to notify the
- // JMS connection exception listener
- _currentException = new QpidException(reason, errorCode, t);
- }
+ _currentException = exc;
}
}
+ public void closed(Session ssn) {}
+
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal)
throws AMQException
@@ -776,7 +808,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
{
// send completed so consumer credits don't dry up
- getQpidSession().messageAcknowledge(_txRangeSet, false);
+ messageAcknowledge(_txRangeSet, false);
}
}
@@ -787,7 +819,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if( _txSize > 0 )
{
- getQpidSession().messageAcknowledge(_txRangeSet, true);
+ messageAcknowledge(_txRangeSet, true);
_txRangeSet.clear();
_txSize = 0;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 2a37298a43..7d535643c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* This is a 0.10 message consumer.
*/
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
- implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -114,9 +113,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _consumerTagString;
}
-
- // ----- Interface org.apache.qpid.client.util.MessageListener
-
/**
*
* This is invoked by the session thread when emptying the session message queue.
@@ -159,28 +155,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
-
-
- /**
- * This method is invoked by the transport layer when a message is delivered for this
- * consumer. The message is transformed and pass to the session.
- * @param xfr an 0.10 message transfer
- */
- public void messageTransfer(MessageTransfer xfr)
-
- //public void onMessage(Message message)
- {
- int channelId = getSession().getChannelId();
- int consumerTag = getConsumerTag();
-
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(consumerTag, xfr);
-
-
- getSession().messageReceived(newMessage);
- // else ignore this message
- }
-
//----- overwritten methods
/**
@@ -304,8 +278,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageAcknowledge(ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
_0_10session.getCurrentException();
}
}
@@ -425,10 +400,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
super.postDeliver(msg);
- if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index ef8aeb58a1..4e5077f0cd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -36,7 +36,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.util.Strings;
-import org.apache.qpid.nclient.util.ByteBufferMessage;
import org.apache.qpid.njms.ExceptionHelper;
import org.apache.qpid.transport.*;
import static org.apache.qpid.transport.Option.*;
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 6763c72ecd..35adda9348 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -88,8 +88,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -142,8 +141,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -171,8 +169,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
}
@@ -201,8 +198,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
}
return result;
@@ -248,8 +244,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
DtxXaStatus status = result.getStatus();
int outcome = XAResource.XA_OK;
@@ -291,8 +286,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
Xid[] result = new Xid[res.getInDoubt().size()];
int i = 0;
@@ -329,8 +323,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -413,8 +406,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
// TODO: The amqp spec does not allow to make the difference
// between an already known XID and a wrong arguments (join and resume are set)
// TODO: make sure amqp addresses that
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 51b4b7899f..354b67cd35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.nclient.DtxSession;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import javax.jms.*;
@@ -36,7 +35,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
/**
* This XASession Qpid DtxSession
*/
- private DtxSession _qpidDtxSession;
+ private org.apache.qpid.transport.Session _qpidDtxSession;
/**
* The standard session
@@ -48,7 +47,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
/**
* Create a JMS XASession
*/
- public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
@@ -65,7 +64,9 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public void createSession()
{
- _qpidDtxSession = _qpidConnection.createDTXSession(0);
+ _qpidDtxSession = _qpidConnection.createSession(0);
+ _qpidDtxSession.setSessionListener(this);
+ _qpidDtxSession.dtxSelect();
}
@@ -126,7 +127,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.DtxSession getQpidSession()
+ protected org.apache.qpid.transport.Session getQpidSession()
{
return _qpidDtxSession;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 8b4488f1f9..d064c27754 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.nclient.*;
import org.apache.qpid.jms.Message;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
@@ -138,7 +137,7 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
}
- public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session)
{
DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
if(deliveryProps != null)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 6b1301a33f..f31bc88509 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -33,9 +33,9 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage
{
private MessageTransfer _transfer;
- public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
+ public UnprocessedMessage_0_10(MessageTransfer xfr)
{
- super(consumerTag);
+ super(Integer.parseInt(xfr.getDestination()));
_transfer = xfr;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
deleted file mode 100644
index af0e724e42..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.nclient;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.qpid.client.url.URLParser_0_10;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.url.QpidURL;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.impl.ClientSession;
-import org.apache.qpid.nclient.impl.ClientSessionDelegate;
-import org.apache.qpid.transport.Channel;
-import org.apache.qpid.transport.ClientDelegate;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionCloseOk;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.ProtocolVersionException;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.network.mina.MinaHandler;
-import org.apache.qpid.transport.network.nio.NioHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class Client implements org.apache.qpid.nclient.Connection
-{
- private Connection _conn;
- private ClosedListener _closedListner;
- private final Lock _lock = new ReentrantLock();
- private static Logger _logger = LoggerFactory.getLogger(Client.class);
- private Condition closeOk;
- private boolean closed = false;
- private long timeout = 60000;
-
- private ProtocolHeader header = null;
-
- /**
- *
- * @return returns a new connection to the broker.
- */
- public static org.apache.qpid.nclient.Connection createConnection()
- {
- return new Client();
- }
-
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
- {
-
- final Condition negotiationComplete = _lock.newCondition();
- closeOk = _lock.newCondition();
- _lock.lock();
-
- ClientDelegate connectionDelegate = new ClientDelegate()
- {
- private boolean receivedClose = false;
- public SessionDelegate getSessionDelegate()
- {
- return new ClientSessionDelegate();
- }
-
- public void exception(Throwable t)
- {
- if (_closedListner != null)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
- }
- else
- {
- throw new RuntimeException("connection closed",t);
- }
- }
-
- public void closed()
- {
- if (_closedListner != null && !this.receivedClose)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
- }
- }
-
- @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct)
- {
- _lock.lock();
- try
- {
- closed = true;
- this.receivedClose = true;
- closeOk.signalAll();
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
- {
- super.connectionClose(context, connectionClose);
- ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue());
- if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
- {
- throw new RuntimeException
- (new QpidException("Server closed the connection: Reason " +
- connectionClose.getReplyText(),
- errorCode,
- null));
- }
- else
- {
- _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
- }
-
- this.receivedClose = true;
- }
- @Override public void init(Channel ch, ProtocolHeader hdr)
- {
- // TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and mino
- if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
- {
- Client.this.header = hdr;
- _lock.lock();
- negotiationComplete.signalAll();
- _lock.unlock();
- }
- }
- };
-
- connectionDelegate.setCondition(_lock,negotiationComplete);
- connectionDelegate.setUsername(username);
- connectionDelegate.setPassword(password);
- connectionDelegate.setVirtualHost(virtualHost);
-
- String transport = System.getProperty("transport","io");
- if (transport.equalsIgnoreCase("nio"))
- {
- _logger.info("using NIO Transport");
- _conn = NioHandler.connect(host, port,connectionDelegate);
- }
- else if (transport.equalsIgnoreCase("io"))
- {
- _logger.info("using Plain IO Transport");
- _conn = IoTransport.connect(host, port,connectionDelegate);
- }
- else
- {
- _logger.info("using MINA Transport");
- _conn = MinaHandler.connect(host, port,connectionDelegate);
- // _conn = NativeHandler.connect(host, port,connectionDelegate);
- }
-
- // XXX: hardcoded version numbers
- _conn.send(new ProtocolHeader(1, 0, 10));
-
- try
- {
- negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
- if (header != null)
- {
- _conn.close();
- throw new ProtocolVersionException(header.getMajor(), header.getMinor());
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void connect(String url)throws QpidException
- {
- URLParser_0_10 parser = null;
- try
- {
- parser = new URLParser_0_10(url);
- }
- catch(Exception e)
- {
- throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e);
- }
- List<BrokerDetails> brokers = parser.getAllBrokerDetails();
- BrokerDetails brokerDetail = brokers.get(0);
- connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"),
- brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"),
- brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password"));
- }
-
- /*
- * Until the dust settles with the URL disucssion
- * I am not going to implement this.
- */
- public void connect(QpidURL url) throws QpidException
- {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- /* {
- // temp impl to tests
- BrokerDetails details = url.getAllBrokerDetails().get(0);
- connect(details.getHost(),
- details.getPort(),
- details.getVirtualHost(),
- details.getUserName(),
- details.getPassword());
- }
-*/
-
- public void close() throws QpidException
- {
- Channel ch = _conn.getChannel(0);
- ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing");
- _lock.lock();
- try
- {
- try
- {
- long start = System.currentTimeMillis();
- long elapsed = 0;
- while (!closed && elapsed < timeout)
- {
- closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS);
- elapsed = System.currentTimeMillis() - start;
- }
- if(!closed)
- {
- throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null);
- }
- }
- catch (InterruptedException e)
- {
- throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null);
- }
- }
- finally
- {
- _lock.unlock();
- }
- _conn.close();
- }
-
- public Session createSession(long expiryInSeconds)
- {
- Channel ch = _conn.getChannel();
- ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
- ssn.sessionRequestTimeout(expiryInSeconds);
- return ssn;
- }
-
- public DtxSession createDTXSession(int expiryInSeconds)
- {
- ClientSession clientSession = (ClientSession) createSession(expiryInSeconds);
- clientSession.dtxSelect();
- return (DtxSession) clientSession;
- }
-
- public void setClosedListener(ClosedListener closedListner)
- {
-
- _closedListner = closedListner;
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
deleted file mode 100644
index 2d5b50b33a..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.QpidException;
-
-/**
- * This represents a physical connection to a broker.
- */
-public interface Connection
-{
- /**
- * Establish the connection using the given parameters
- *
- * @param host host name
- * @param port port number
- * @param virtualHost the virtual host name
- * @param username user name
- * @param password password
- * @throws QpidException If the communication layer fails to establish the connection.
- */
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
-
- /**
- * Establish the connection with the broker identified by the URL.
- *
- * @param url Specifies the URL of the broker.
- * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown.
- */
- public void connect(String url) throws QpidException;
-
- /**
- * Close this connection.
- *
- * @throws QpidException if the communication layer fails to close the connection.
- */
- public void close() throws QpidException;
-
- /**
- * Create a session for this connection.
- * <p> The returned session is suspended
- * (i.e. this session is not attached to an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than
- * or equal to 0 then the session does not expire.
- * @return A newly created (suspended) session.
- */
- public Session createSession(long expiryInSeconds);
-
- /**
- * Create a DtxSession for this connection.
- * <p> A Dtx Session must be used when resources have to be manipulated as
- * part of a global transaction.
- * <p> The retuned DtxSession is suspended
- * (i.e. this session is not attached with an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal
- * to 0 then the session does not expire.
- * @return A newly created (suspended) DtxSession.
- */
- public DtxSession createDTXSession(int expiryInSeconds);
-
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ClosedListener
- *
- * @param exceptionListner The ClosedListener
- */
- public void setClosedListener(ClosedListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
deleted file mode 100644
index 8a859f2d84..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.transport.Future;
-import org.apache.qpid.transport.GetTimeoutResult;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RecoverResult;
-import org.apache.qpid.transport.XaResult;
-import org.apache.qpid.transport.Xid;
-
-/**
- * The resources for this session are controlled under the scope of a distributed transaction.
- */
-public interface DtxSession extends Session
-{
-
- /**
- * This method is called when messages should be produced and consumed on behalf a transaction
- * branch identified by xid.
- * possible options are:
- * <ul>
- * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen.
- * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be started.
- * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
- * @return Confirms to the client that the transaction branch is started or specify the error condition.
- */
- public Future<XaResult> dtxStart(Xid xid, Option... options);
-
- /**
- * This method is called when the work done on behalf of a transaction branch finishes or needs to
- * be suspended.
- * possible options are:
- * <ul>
- * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
- * otherwise this portion of work has
- * completed successfully.
- * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
- * temporarily suspended in an incomplete state.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be ended.
- * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
- * @return Confirms to the client that the transaction branch is ended or specifies the error condition.
- */
- public Future<XaResult> dtxEnd(Xid xid, Option... options);
-
- /**
- * Commit the work done on behalf of a transaction branch. This method commits the work associated
- * with xid. Any produced messages are made available and any consumed messages are discarded.
- * The only possible option is:
- * <ul>
- * <li> {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be committed.
- * @param options Available option is: {@link Option#ONE_PHASE}
- * @return Confirms to the client that the transaction branch is committed or specifies the error condition.
- */
- public Future<XaResult> dtxCommit(Xid xid, Option... options);
-
- /**
- * This method is called to forget about a heuristically completed transaction branch.
- *
- * @param xid Specifies the xid of the transaction branch to be forgotten.
- */
- public void dtxForget(Xid xid, Option ... options);
-
- /**
- * This method obtains the current transaction timeout value in seconds. If set-timeout was not
- * used prior to invoking this method, the return value is the default timeout value; otherwise, the
- * value used in the previous set-timeout call is returned.
- *
- * @param xid Specifies the xid of the transaction branch used for getting the timeout.
- * @return The current transaction timeout value in seconds.
- */
- public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options);
-
- /**
- * This method prepares any message produced or consumed on behalf of xid, ready for commitment.
- *
- * @param xid Specifies the xid of the transaction branch to be prepared.
- * @return The status of the prepare operation can be any one of:
- * xa-ok: Normal execution.
- * <p/>
- * xa-rdonly: The transaction branch was read-only and has been committed.
- * <p/>
- * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified
- * reason.
- * <p/>
- * xa-rbtimeout: The work represented by this transaction branch took too long.
- */
- public Future<XaResult> dtxPrepare(Xid xid, Option ... options);
-
- /**
- * This method is called to obtain a list of transaction branches that are in a prepared or
- * heuristically completed state.
- * @return a array of xids to be recovered.
- */
- public Future<RecoverResult> dtxRecover(Option ... options);
-
- /**
- * This method rolls back the work associated with xid. Any produced messages are discarded and
- * any consumed messages are re-queued.
- *
- * @param xid Specifies the xid of the transaction branch to be rolled back.
- * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition.
- */
- public Future<XaResult> dtxRollback(Xid xid, Option ... options);
-
- /**
- * Sets the specified transaction branch timeout value in seconds.
- *
- * @param xid Specifies the xid of the transaction branch for setting the timeout.
- * @param timeout The transaction timeout value in seconds.
- */
- public void dtxSetTimeout(Xid xid, long timeout, Option ... options);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
deleted file mode 100644
index 0d84394c7c..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.qpid.transport.*;
-import org.apache.qpid.api.Message;
-
-/**
- * <p>A session is associated with a connection.
- * When it is created, a session is not associated with an underlying channel.
- * The session is single threaded. </p>
- * <p/>
- * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method.
- * For example, <code>command1</code> will be synchronously invoked by using the following sequence:
- * <ul>
- * <li> <code>session.command1()</code>
- * <li> <code>session.sync()</code>
- * </ul>
- */
-public interface Session
-{
- public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1;
- public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0;
- public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0;
- public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1;
- public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
- public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
- public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
- public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
- public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF;
- public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
- public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
- public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
- public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
-
- //------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
-
- /**
- * Sync method will block the session until all outstanding commands
- * are executed.
- */
- public void sync();
-
- public void close();
-
- public void sessionDetach(byte[] name, Option ... options);
-
- public void sessionRequestTimeout(long expiry, Option ... options);
-
- public byte[] getName();
-
- public void setAutoSync(boolean value);
-
- //------------------------------------------------------
- // Messaging methods
- // Producer
- //------------------------------------------------------
- /**
- * Transfer a message to a specified exchange.
- * <p/>
- * <p>This transfer provides a complete message
- * using a single method. The method is internally mapped to messageTransfer() and headers() followed
- * by data() and endData().
- * <b><i>This method should only be used by small messages.</b></i></p>
- *
- * @param destination The exchange the message is being sent to.
- * @param msg The Message to be sent.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
- * must be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
- * acquired when the transfer starts.
- * </ul>
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
- */
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
- throws IOException;
-
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, ByteBuffer body, Option ... options);
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, byte[] body, Option ... options);
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, String body, Option ... options);
-
- //------------------------------------------------------
- // Messaging methods
- // Consumer
- //------------------------------------------------------
-
- /**
- * Associate a message listener with a destination.
- * <p> The destination is bound to a queue, and messages are filtered based
- * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled).
- * <p> The valid options are:
- * <ul>
- * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this
- * subscription can access the queue.
- * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect.
- * </ul>
- *
- * @param queue The queue that the receiver is receiving messages from.
- * @param destination The destination, or delivery tag, for the subscriber.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must
- * be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
- * acquired when the transfer starts.
- * </ul>
- * @param listener The listener for this destination. To transfer large messages
- * use a {@link org.apache.qpid.nclient.MessagePartListener}.
- * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE}
- * and {@link Option#NONE}.
- * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies
- * according to the provider's implementation.
- */
- public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, Object> filter, Option... options);
-
- /**
- * This method cancels a consumer. The server will not send any more messages to the specified destination.
- * This does not affect already delivered messages.
- * The client may receive a
- * number of messages in between sending the cancel method and receiving
- * notification that the cancellation has been completed.
- *
- * @param destination The destination to be cancelled.
- */
- public void messageCancel(String destination, Option ... options);
-
- /**
- * Associate a message listener with a destination.
- * <p> Only one listener is permitted for each destination. When a new listener is created,
- * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener
- * has completed processing a message.
- *
- * @param destination The destination the listener is associated with.
- * @param listener The new listener for this destination.
- */
- public void setMessageListener(String destination, MessagePartListener listener);
-
- /**
- * Sets the mode of flow control used for a given destination.
- * <p> With credit based flow control, the broker continually maintains its current
- * credit balance with the recipient. The credit balance consists of two values, a message
- * count, and a byte count. Whenever message data is sent, both counts must be decremented.
- * If either value reaches zero, the flow of message data must stop. Additional credit is
- * received via the {@link Session#messageFlow} method.
- * <p> Window based flow control is identical to credit based flow control, however message
- * acknowledgment implicitly grants a single unit of message credit, and the size of the
- * message in byte credits for each acknowledged message.
- *
- * @param destination The destination to set the flow mode on.
- * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
- * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
- */
- public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options);
-
-
- /**
- * This method controls the flow of message data to a given destination. It is used by the
- * recipient of messages to dynamically match the incoming rate of message flow to its
- * processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
- * number of the specified unit to the available credit balance for the specified destination.
- * A value of 0 indicates an infinite amount of credit. This disables any limit for
- * the given unit until the credit balance is zeroed with {@link Session#messageStop}
- * or {@link Session#messageFlush}.
- *
- * @param destination The destination to set the flow.
- * @param unit Specifies the unit of credit balance.
- * <p/>
- * One of: <ul>
- * <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE})
- * <li> byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE})
- * </ul>
- * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
- */
- public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options);
-
- /**
- * Forces the broker to exhaust its credit supply.
- * <p> The credit on the broker will remain at zero once
- * this method is completed.
- *
- * @param destination The destination on which the credit supply is to be exhausted.
- */
- public void messageFlush(String destination, Option ... options);
-
- /**
- * On receipt of this method, the brokers set credit to zero for a given
- * destination. When confirmation of this method
- * is issued credit is set to zero. No further messages will be sent until
- * further credit is received.
- *
- * @param destination The destination on which to reset credit.
- */
- public void messageStop(String destination, Option ... options);
-
- /**
- * Acknowledge the receipt of a range of messages.
- * <p>Messages must already be acquired, either by receiving them in
- * pre-acquire mode or by explicitly acquiring them.
- *
- * @param ranges Range of messages to be acknowledged.
- * @param accept pecify whether to send a message accept to the broker
- */
- public void messageAcknowledge(RangeSet ranges, boolean accept);
-
- /**
- * Reject a range of acquired messages.
- * <p>The broker will deliver rejected messages to the
- * alternate-exchange on the queue from which it came. If no alternate-exchange is
- * defined for that queue the broker will discard the message.
- *
- * @param ranges Range of messages to be rejected.
- * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or
- * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but
- * failed).
- * @param text String describing the reason for a message transfer rejection.
- */
- public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options);
-
- /**
- * As it is possible that the broker does not manage to reject some messages, after completion of
- * {@link Session#messageReject} this method will return the ranges of rejected messages.
- * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the
- * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}.
- * <p> A recommended invocation sequence would be:
- * <ul>
- * <li> {@link Session#messageReject}
- * <li> {@link Session#sync()}
- * <li> {@link Session#getRejectedMessages()}
- * </ul>
- *
- * @return The rejected message ranges
- */
- public RangeSet getRejectedMessages();
-
- /**
- * Try to acquire ranges of messages hence releasing them form the queue.
- * This means that once acknowledged, a message will not be delivered to any other receiver.
- * <p> As those messages may have been consumed by another receivers hence,
- * message acquisition can fail.
- * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
- * <p> This method should only be called on non-acquired messages.
- *
- * @param ranges Ranges of messages to be acquired.
- * @return Indicates the acquired messages
- */
- public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options);
-
- /**
- * Give up responsibility for processing ranges of messages.
- * <p> Released messages are re-enqueued.
- *
- * @param ranges Ranges of messages to be released.
- * @param options Valid option is: {@link Option#SET_REDELIVERED})
- */
- public void messageRelease(RangeSet ranges, Option ... options);
-
- // -----------------------------------------------
- // Local transaction methods
- // ----------------------------------------------
- /**
- * Selects the session for local transaction support.
- */
- public void txSelect(Option ... options);
-
- /**
- * Commit the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txCommit(Option ... options) throws IllegalStateException;
-
- /**
- * Roll back the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txRollback(Option ... options) throws IllegalStateException;
-
- //---------------------------------------------
- // Queue methods
- //---------------------------------------------
-
- /**
- * Declare a queue with the given queueName
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set,
- * then the queue is deleted when the connection closes.
- * If this field is set and the exclusive field is not set the queue is deleted when all
- * the consumers have finished using it.
- * <li> {@link Option#DURABLE}: <p> If set when creating a new queue,
- * the queue will be marked as durable. Durable queues
- * remain active when a server restarts. Non-durable queues (transient queues) are purged
- * if/when a server restarts. Note that durable queues do not necessarily hold persistent
- * messages, although it does not make sense to send persistent messages to a transient
- * queue.
- * <li> {@link Option#EXCLUSIVE}: <p> Exclusive queues can only be used from one connection at a time.
- * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
- * declaring connection closes.
- * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
- * This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * <p>In the absence of a particular option, the defaul value is false for each option
- *
- * @param queueName The name of the delcared queue.
- * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
- * may be rejected by a queue for the following reasons:
- * <oL> <li> The queue is deleted when it is not empty;
- * <li> Immediate delivery of a message is requested, but there are no consumers connected to
- * the queue. </ol>
- * @param arguments Used for backward compatibility
- * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
- * @see Option
- */
- public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments,
- Option... options);
-
- /**
- * Bind a queue with an exchange.
- *
- * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current
- * queue for the session, which is the last declared queue.
- * @param exchangeName The exchange name.
- * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages
- * depending on the exchange configuration. Not all exchanges use a routing key - refer to
- * the specific exchange documentation. If the queue name is empty, the server uses the last
- * queue declared on the session. If the routing key is also empty, the server uses this
- * queue name for the routing key as well. If the queue name is provided but the routing key
- * is empty, the server does the binding with that empty routing key. The meaning of empty
- * routing keys depends on the exchange implementation.
- * @param arguments Used for backward compatibility
- */
- public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments,
- Option ... options);
-
- /**
- * Unbind a queue from an exchange.
- *
- * @param queueName Specifies the name of the queue to unbind.
- * @param exchangeName The name of the exchange to unbind from.
- * @param routingKey Specifies the routing key of the binding to unbind.
- */
- public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options);
-
- /**
- * This method removes all messages from a queue. It does not cancel consumers. Purged messages
- * are deleted without any formal "undo" mechanism.
- *
- * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
- * current queue for the session, which is the last declared queue.
- */
- public void queuePurge(String queueName, Option ... options);
-
- /**
- * This method deletes a queue. When a queue is deleted any pending messages are sent to a
- * dead-letter queue if this is defined in the server configuration, and all consumers on the
- * queue are cancelled.
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
- * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
- * If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * </p>
- * <p/>
- * <p>In the absence of a particular option, the defaul value is false for each option</p>
- *
- * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
- * current queue for the session, which is the last declared queue.
- * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
- * and {@link Option#NONE})
- * @see Option
- */
- public void queueDelete(String queueName, Option... options);
-
-
- /**
- * This method is used to request information on a particular queue.
- *
- * @param queueName The name of the queue for which information is requested.
- * @return Information on the specified queue.
- */
- public Future<QueueQueryResult> queueQuery(String queueName, Option ... options);
-
-
- /**
- * This method is used to request information on a particular binding.
- *
- * @param exchange The exchange name.
- * @param queue The queue name.
- * @param routingKey The routing key
- * @param arguments bacward compatibilties params.
- * @return Information on the specified binding.
- */
- public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
- Map<String, Object> arguments, Option ... options);
-
- // --------------------------------------
- // exhcange methods
- // --------------------------------------
-
- /**
- * This method creates an exchange. If the exchange already exists,
- * the method verifies the class and checks the details are correct.
- * <p>Valid options are:
- * <ul>
- * <li>{@link Option#AUTO_DELETE}: <p>If set, the exchange is deleted when all queues have finished using it.
- * <li>{@link Option#DURABLE}: <p>If set, the exchange will
- * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient
- * exchanges) are purged when a server restarts.
- * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange.
- * The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect.
- * </ul>
- * <p>In the absence of a particular option, the defaul value is false for each option</p>
- *
- * @param exchangeName The exchange name.
- * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
- * exchange types define the functionality of the exchange - i.e. how messages are routed
- * through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
- * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
- * the message will be sent.
- * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#PASSIVE}, {@link Option#NONE})
- * @param arguments Used for backward compatibility
- * @see Option
- */
- public void exchangeDeclare(String exchangeName, String type, String alternateExchange,
- Map<String, Object> arguments, Option... options);
-
- /**
- * This method deletes an exchange. When an exchange is deleted all queue bindings on the
- * exchange are cancelled.
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
- * exchange has queue bindings the server does not delete it but raises a channel exception
- * instead.
- * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * <p>Note that if an option is not set, it will default to false.
- *
- * @param exchangeName The name of exchange to be deleted.
- * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
- * @see Option
- */
- public void exchangeDelete(String exchangeName, Option... options);
-
-
- /**
- * This method is used to request information about a particular exchange.
- *
- * @param exchangeName The name of the exchange about which information is requested. If not set, the method will
- * return information about the default exchange.
- * @return Information on the specified exchange.
- */
- public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options);
-
- /**
- * If the session receives a sessionClosed with an error code it
- * informs the session's exceptionListener
- *
- * @param exceptionListner The exceptionListener
- */
- public void setClosedListener(ClosedListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
deleted file mode 100644
index 869f974ae6..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package org.apache.qpid.nclient.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.MessagePartListener;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-
-import static org.apache.qpid.transport.Option.*;
-
-/**
- * Implements a Qpid Sesion.
- */
-public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession
-{
- static
- {
- String max = "message_size_before_sync"; // KB's
- try
- {
- MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000"));
- }
- catch (NumberFormatException e)
- {
- // use default size
- MAX_NOT_SYNC_DATA_LENGH = 200000000;
- }
- String flush = "message_size_before_flush";
- try
- {
- MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000"));
- }
- catch (NumberFormatException e)
- {
- // use default size
- MAX_NOT_FLUSH_DATA_LENGH = 20000000;
- }
- }
-
- private static long MAX_NOT_SYNC_DATA_LENGH;
- private static long MAX_NOT_FLUSH_DATA_LENGH;
-
- private Map<String,MessagePartListener> _messageListeners = new ConcurrentHashMap<String,MessagePartListener>();
- private ClosedListener _exceptionListner;
- private RangeSet _rejectedMessages;
- private long _currentDataSizeNotSynced;
- private long _currentDataSizeNotFlushed;
-
- public ClientSession(byte[] name)
- {
- super(name);
- }
-
- public void messageAcknowledge(RangeSet ranges, boolean accept)
- {
- for (Range range : ranges)
- {
- super.processed(range);
- }
- super.flushProcessed(accept ? BATCH : NONE);
- if (accept)
- {
- messageAccept(ranges);
- }
- }
-
- public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
- {
- setMessageListener(destination,listener);
- super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode), null, 0, filter,
- options);
- }
-
- public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
- {
- DeliveryProperties dp = msg.getDeliveryProperties();
- MessageProperties mp = msg.getMessageProperties();
- ByteBuffer body = msg.readData();
- int size = body.remaining();
- super.messageTransfer
- (destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode),
- new Header(dp, mp), body);
- _currentDataSizeNotSynced += size;
- _currentDataSizeNotFlushed += size;
- }
-
- public void sync()
- {
- super.sync();
- _currentDataSizeNotSynced = 0;
- }
-
- public RangeSet getRejectedMessages()
- {
- return _rejectedMessages;
- }
-
- public void setMessageListener(String destination, MessagePartListener listener)
- {
- if (listener == null)
- {
- throw new IllegalArgumentException("Cannot set message listener to null");
- }
- _messageListeners.put(destination, listener);
- }
-
- public void setClosedListener(ClosedListener exceptionListner)
- {
- _exceptionListner = exceptionListner;
- }
-
- void setRejectedMessages(RangeSet rejectedMessages)
- {
- _rejectedMessages = rejectedMessages;
- }
-
- void notifyException(QpidException ex)
- {
- _exceptionListner.onClosed(null, null, null);
- }
-
- Map<String,MessagePartListener> getMessageListeners()
- {
- return _messageListeners;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
deleted file mode 100644
index 6bcd4fbce5..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.qpid.nclient.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.ErrorCode;
-
-import org.apache.qpid.nclient.MessagePartListener;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDetached;
-import org.apache.qpid.transport.SessionDelegate;
-
-
-public class ClientSessionDelegate extends SessionDelegate
-{
-
- // --------------------------------------------
- // Message methods
- // --------------------------------------------
- @Override public void messageTransfer(Session session, MessageTransfer xfr)
- {
- MessagePartListener listener = ((ClientSession)session).getMessageListeners()
- .get(xfr.getDestination());
- listener.messageTransfer(xfr);
- }
-
- @Override public void messageReject(Session session, MessageReject struct)
- {
- for (Range range : struct.getTransfers())
- {
- for (long l = range.getLower(); l <= range.getUpper(); l++)
- {
- System.out.println("message rejected: " +
- session.getCommand((int) l));
- }
- }
- ((ClientSession)session).setRejectedMessages(struct.getTransfers());
- ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
- session.processed(struct);
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
index 155de2a678..a1dc48fcda 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
@@ -23,60 +23,55 @@ package org.apache.qpid.nclient.impl;
import org.apache.qpid.ErrorCode;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import java.nio.ByteBuffer;
import java.util.UUID;
public class DemoClient
{
- public static MessagePartListenerAdapter createAdapter()
+ public static class DemoListener implements SessionListener
{
- return new MessagePartListenerAdapter(new MessageListener()
+ public void opened(Session ssn) {}
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ System.out.println(exc);
+ }
+
+ public void message(Session ssn, MessageTransfer m)
{
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getHeader().get(MessageProperties.class).getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ public void closed(Session ssn) {}
}
public static final void main(String[] args)
{
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
+ Connection conn = new Connection();
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
+ ssn.setSessionListener(new DemoListener());
ssn.queueDeclare("queue1", null, null);
ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// queue
ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
@@ -91,9 +86,12 @@ public class DemoClient
ssn.sync();
// topic subs
- ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic1", "myDest2", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+ ssn.messageSubscribe("topic2", "myDest3", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+ ssn.messageSubscribe("topic3", "myDest4", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
ssn.sync();
ssn.queueDeclare("topic1", null, null);
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
index dd4a78fa2b..6c6cc308e9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
@@ -28,12 +28,6 @@ import java.util.Map;
import org.apache.qpid.ErrorCode;
import org.apache.qpid.QpidException;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -41,9 +35,13 @@ import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
-public class BasicInteropTest implements ClosedListener
+public class BasicInteropTest implements SessionListener
{
private Session session;
@@ -62,7 +60,7 @@ public class BasicInteropTest implements ClosedListener
public void testCreateConnection(){
System.out.println("------- Creating connection--------");
- conn = Client.createConnection();
+ conn = new Connection();
try{
conn.connect(host, 5672, "test", "guest", "guest");
}catch(Exception e){
@@ -116,23 +114,11 @@ public class BasicInteropTest implements ClosedListener
public void testSubscribe()
{
System.out.println("------- Sending a subscribe --------");
+ session.setSessionListener(this);
session.messageSubscribe("testQueue", "myDest",
- Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(new MessageListener(){
-
- public void onMessage(Message message)
- {
- System.out.println("--------Message Received--------");
- System.out.println(message.toString());
- System.out.println("--------/Message Received--------");
- RangeSet ack = new RangeSet();
- ack.add(message.getMessageTransferId(),message.getMessageTransferId());
- session.messageAcknowledge(ack, true);
- }
-
- }),
- null);
+ MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
System.out.println("------- Setting Credit mode --------");
session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
@@ -141,20 +127,32 @@ public class BasicInteropTest implements ClosedListener
session.messageFlow("myDest", MessageCreditUnit.BYTE, -1);
}
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ System.out.println("--------Message Received--------");
+ System.out.println(xfr.toString());
+ System.out.println("--------/Message Received--------");
+ ssn.processed(xfr);
+ ssn.flushProcessed();
+ }
+
public void testMessageFlush()
{
session.messageFlush("myDest");
session.sync();
}
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ public void exception(Session ssn, SessionException exc)
{
System.out.println("------- Broker Notified an error --------");
- System.out.println("------- " + errorCode + " --------");
- System.out.println("------- " + reason + " --------");
+ System.out.println("------- " + exc + " --------");
System.out.println("------- /Broker Notified an error --------");
}
+ public void closed(Session ssn) {}
+
public static void main(String[] args) throws QpidException
{
String host = "0.0.0.0";
diff --git a/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
index 83d434b20a..db84b83adb 100644
--- a/java/common/src/main/java/org/apache/qpid/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
@@ -174,23 +174,14 @@ class ToyBroker extends SessionDelegate
public static final void main(String[] args) throws IOException
{
final ToyExchange exchange = new ToyExchange();
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new ToyBroker(exchange);
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
MinaHandler.accept("0.0.0.0", 5672, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java
index cb10859c9f..79bb286d76 100644
--- a/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -56,7 +56,7 @@ class ToyClient extends SessionDelegate
public static final void main(String[] args)
{
Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ClientDelegate()
+ new ClientDelegate(null, "guest", "guest")
{
public SessionDelegate getSessionDelegate()
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
index d6b015930b..d973739ed6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
@@ -112,7 +112,7 @@ public class Channel extends Invoker
public void closed()
{
- log.debug("channel closed: ", this);
+ log.debug("channel closed: %s", this);
if (session != null)
{
session.closed();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index bbdadfadb9..316c26429e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,21 +20,123 @@
*/
package org.apache.qpid.transport;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
/**
* ClientDelegate
*
*/
-public abstract class ClientDelegate extends ConnectionDelegate
+public class ClientDelegate extends ConnectionDelegate
{
+ private String vhost;
+ private String username;
+ private String password;
+
+ public ClientDelegate(String vhost, String username, String password)
+ {
+ this.vhost = vhost;
+ this.username = username;
+ this.password = password;
+ }
+
public void init(Channel ch, ProtocolHeader hdr)
{
if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
- throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor());
+ Connection conn = ch.getConnection();
+ conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()));
+ }
+ }
+
+ @Override public void connectionStart(Channel ch, ConnectionStart start)
+ {
+ Connection conn = ch.getConnection();
+ List<Object> mechanisms = start.getMechanisms();
+ if (mechanisms == null || mechanisms.isEmpty())
+ {
+ ch.connectionStartOk
+ (Collections.EMPTY_MAP, null, null, conn.getLocale());
+ return;
}
+
+ String[] mechs = new String[mechanisms.size()];
+ mechanisms.toArray(mechs);
+
+ try
+ {
+ UsernamePasswordCallbackHandler handler =
+ new UsernamePasswordCallbackHandler();
+ handler.initialise(username, password);
+ SaslClient sc = Sasl.createSaslClient
+ (new String[] {"PLAIN"}, null, "AMQP", "localhost", null, handler);
+ conn.setSaslClient(sc);
+
+ byte[] response = sc.hasInitialResponse() ?
+ sc.evaluateChallenge(new byte[0]) : null;
+ ch.connectionStartOk
+ (Collections.EMPTY_MAP, sc.getMechanismName(), response,
+ conn.getLocale());
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionSecure(Channel ch, ConnectionSecure secure)
+ {
+ Connection conn = ch.getConnection();
+ SaslClient sc = conn.getSaslClient();
+ try
+ {
+ byte[] response = sc.evaluateChallenge(secure.getChallenge());
+ ch.connectionSecureOk(response);
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionTune(Channel ch, ConnectionTune tune)
+ {
+ Connection conn = ch.getConnection();
+ conn.setChannelMax(tune.getChannelMax());
+ ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ ch.connectionOpen(vhost, null, Option.INSIST);
+ }
+
+ @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok)
+ {
+ ch.getConnection().setState(OPEN);
+ }
+
+ @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir)
+ {
+ throw new UnsupportedOperationException();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 68b9b209bb..ae9420eb1a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,14 +20,22 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -44,23 +52,164 @@ public class Connection
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
+ enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
private static final Logger log = Logger.get(Connection.class);
- final private Sender<ProtocolEvent> sender;
- final private ConnectionDelegate delegate;
+ class DefaultConnectionListener implements ConnectionListener
+ {
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exception)
+ {
+ throw exception;
+ }
+ public void closed(Connection conn) {}
+ }
+
+ private ConnectionDelegate delegate;
+ private Sender<ProtocolEvent> sender;
+
+ final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+
+ private State state = NEW;
+ private Object lock = new Object();
+ private long timeout = 60000;
+ private ConnectionListener listener = new DefaultConnectionListener();
+ private Throwable error = null;
+
private int channelMax = 1;
+ private String locale;
+ private SaslServer saslServer;
+ private SaslClient saslClient;
+
// want to make this final
private int _connectionId;
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ public Connection() {}
- public Connection(Sender<ProtocolEvent> sender,
- ConnectionDelegate delegate)
+ public void setConnectionDelegate(ConnectionDelegate delegate)
{
- this.sender = sender;
this.delegate = delegate;
}
+ public void setConnectionListener(ConnectionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultConnectionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public Sender<ProtocolEvent> getSender()
+ {
+ return sender;
+ }
+
+ public void setSender(Sender<ProtocolEvent> sender)
+ {
+ this.sender = sender;
+ }
+
+ void setState(State state)
+ {
+ synchronized (lock)
+ {
+ this.state = state;
+ lock.notifyAll();
+ }
+ }
+
+ void setLocale(String locale)
+ {
+ this.locale = locale;
+ }
+
+ String getLocale()
+ {
+ return locale;
+ }
+
+ void setSaslServer(SaslServer saslServer)
+ {
+ this.saslServer = saslServer;
+ }
+
+ SaslServer getSaslServer()
+ {
+ return saslServer;
+ }
+
+ void setSaslClient(SaslClient saslClient)
+ {
+ this.saslClient = saslClient;
+ }
+
+ SaslClient getSaslClient()
+ {
+ return saslClient;
+ }
+
+ public void connect(String host, int port, String vhost, String username, String password)
+ {
+ synchronized (lock)
+ {
+ state = OPENING;
+
+ delegate = new ClientDelegate(vhost, username, password);
+
+ IoTransport.connect(host, port, ConnectionBinding.get(this));
+ send(new ProtocolHeader(1, 0, 10));
+
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == OPENING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ Throwable t = error;
+ error = null;
+ close();
+ throw new ConnectionException(t);
+ }
+
+ switch (state)
+ {
+ case OPENING:
+ close();
+ throw new ConnectionException("connect() timed out");
+ case OPEN:
+ break;
+ case CLOSED:
+ throw new ConnectionException("connect() aborted");
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+
+ listener.opened(this);
+ }
+
+ public Session createSession()
+ {
+ return createSession(0);
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = getChannel();
+ Session ssn = new Session(UUID.randomUUID().toString().getBytes());
+ ssn.attach(ch);
+ ssn.sessionAttach(ssn.getName());
+ ssn.sessionRequestTimeout(expiryInSeconds);
+ return ssn;
+ }
+
public void setConnectionId(int id)
{
_connectionId = id;
@@ -86,7 +235,12 @@ public class Connection
public void send(ProtocolEvent event)
{
log.debug("SEND: [%s] %s", this, event);
- sender.send(event);
+ Sender s = sender;
+ if (s == null)
+ {
+ throw new ConnectionException("connection closed");
+ }
+ s.send(event);
}
public void flush()
@@ -107,7 +261,7 @@ public class Connection
public Channel getChannel()
{
- synchronized (channels)
+ synchronized (lock)
{
for (int i = 0; i < getChannelMax(); i++)
{
@@ -123,7 +277,7 @@ public class Connection
public Channel getChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
Channel channel = channels.get(number);
if (channel == null)
@@ -137,45 +291,146 @@ public class Connection
void removeChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
channels.remove(number);
}
}
+ public void exception(ConnectionException e)
+ {
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = e;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, e);
+ break;
+ }
+ }
+ }
+
public void exception(Throwable t)
{
- delegate.exception(t);
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = t;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, new ConnectionException(t));
+ break;
+ }
+ }
}
void closeCode(ConnectionClose close)
{
- synchronized (channels)
+ synchronized (lock)
{
for (Channel ch : channels.values())
{
ch.closeCode(close);
}
+ ConnectionCloseCode code = close.getReplyCode();
+ if (code != ConnectionCloseCode.NORMAL)
+ {
+ exception(new ConnectionException(close));
+ }
}
}
public void closed()
{
log.debug("connection closed: %s", this);
- synchronized (channels)
+
+ if (state == OPEN)
+ {
+ exception(new ConnectionException("connection aborted"));
+ }
+
+ synchronized (lock)
{
List<Channel> values = new ArrayList<Channel>(channels.values());
for (Channel ch : values)
{
ch.closed();
}
+
+ sender = null;
+ setState(CLOSED);
}
- delegate.closed();
+
+ listener.closed(this);
}
public void close()
{
- sender.close();
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ Channel ch = getChannel(0);
+ state = CLOSING;
+ ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == CLOSING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ close();
+ throw new ConnectionException(error);
+ }
+
+ switch (state)
+ {
+ case CLOSING:
+ close();
+ throw new ConnectionException("close() timed out");
+ case CLOSED:
+ break;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ if (sender != null)
+ {
+ sender.close();
+ w = new Waiter(lock, timeout);
+ while (w.hasTime() && sender != null && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ throw new ConnectionException(error);
+ }
+
+ if (sender != null)
+ {
+ throw new ConnectionException("close() timed out");
+ }
+ }
+ break;
+ }
+ }
}
public String toString()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 2aa1db7b28..9056a3120b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -22,22 +22,7 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.SecurityHelper;
-import org.apache.qpid.QpidException;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -57,231 +42,26 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
private static final Logger log = Logger.get(ConnectionDelegate.class);
- private String _username = "guest";
- private String _password = "guest";;
- private String _mechanism;
- private String _virtualHost;
- private SaslClient saslClient;
- private SaslServer saslServer;
- private String _locale = "utf8";
- private int maxFrame = 64*1024;
- private Condition _negotiationComplete;
- private Lock _negotiationCompleteLock;
-
- public abstract SessionDelegate getSessionDelegate();
-
- public abstract void exception(Throwable t);
-
- public abstract void closed();
-
- public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
- {
- _negotiationComplete = negotiationComplete;
- _negotiationCompleteLock = negotiationCompleteLock;
- }
-
- public void init(Channel ch, ProtocolHeader hdr)
- {
- ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
- List<Object> plain = new ArrayList<Object>();
- plain.add("PLAIN");
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- ch.connectionStart(null, plain, utf8);
- }
-
- // ----------------------------------------------
- // Client side
- //-----------------------------------------------
- @Override public void connectionStart(Channel context, ConnectionStart struct)
- {
- String mechanism = null;
- byte[] response = null;
- try
- {
- mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
- saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
- SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
- response = saslClient.evaluateChallenge(new byte[0]);
- }
- catch (UnsupportedEncodingException e)
- {
- // need error handling
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
-
- Map<String,Object> props = new HashMap<String,Object>();
- context.connectionStartOk(props, mechanism, response, _locale);
- }
-
- @Override public void connectionSecure(Channel context, ConnectionSecure struct)
- {
- try
- {
- byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
- context.connectionSecureOk(response);
- }
- catch (SaslException e)
- {
- // need error handling
- }
- }
-
- @Override public void connectionTune(Channel context, ConnectionTune struct)
- {
- context.getConnection().setChannelMax(struct.getChannelMax());
- context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
- context.connectionOpen(_virtualHost, null, Option.INSIST);
- }
-
-
- @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
- {
- List<Object> knownHosts = struct.getKnownHosts();
- if(_negotiationCompleteLock != null)
- {
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
- {
- _negotiationCompleteLock.unlock();
- }
- }
- }
-
- public void connectionRedirect(Channel context, ConnectionRedirect struct)
- {
- // not going to bother at the moment
- }
-
- // ----------------------------------------------
- // Server side
- //-----------------------------------------------
- @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
+ public SessionDelegate getSessionDelegate()
{
- //set the client side locale on the server side
- _locale = struct.getLocale();
- _mechanism = struct.getMechanism();
-
- //try
- //{
- //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
- byte[] challenge = null;
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- /*}
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }*/
+ return new SessionDelegate();
}
- @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
- {
- try
- {
- saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
- }
-
-
- @Override public void connectionOpen(Channel context, ConnectionOpen struct)
- {
- List<Object> hosts = new ArrayList<Object>();
- hosts.add("amqp:1223243232325");
- context.connectionOpenOk(hosts);
- }
+ public abstract void init(Channel ch, ProtocolHeader hdr);
@Override public void connectionClose(Channel ch, ConnectionClose close)
{
- ch.getConnection().closeCode(close);
+ Connection conn = ch.getConnection();
ch.connectionCloseOk();
+ conn.getSender().close();
+ conn.closeCode(close);
+ conn.setState(CLOSE_RCVD);
}
- public String getPassword()
- {
- return _password;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public void setUsername(String username)
- {
- _username = username;
- }
-
- public String getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(String host)
+ @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
{
- _virtualHost = host;
+ Connection conn = ch.getConnection();
+ conn.getSender().close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
index c3239ef684..1bd7d516cf 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
@@ -26,17 +26,37 @@ package org.apache.qpid.transport;
*
*/
-public class ConnectionException extends RuntimeException
+public class ConnectionException extends TransportException
{
private ConnectionClose close;
- public ConnectionException(ConnectionClose close)
+ public ConnectionException(String message, ConnectionClose close, Throwable cause)
{
- super(close.getReplyText());
+ super(message, cause);
this.close = close;
}
+ public ConnectionException(String message)
+ {
+ this(message, null, null);
+ }
+
+ public ConnectionException(String message, Throwable cause)
+ {
+ this(message, null, cause);
+ }
+
+ public ConnectionException(Throwable cause)
+ {
+ this(cause.getMessage(), null, cause);
+ }
+
+ public ConnectionException(ConnectionClose close)
+ {
+ this(close.getReplyText(), close, null);
+ }
+
public ConnectionClose getClose()
{
return close;
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
index 4cf0cab1ec..616e76825a 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,25 +16,23 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.ErrorCode;
+package org.apache.qpid.transport;
/**
- * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
- * informs the connection's ExceptionListener
+ * ConnectionListener
+ *
*/
-public interface ClosedListener
+
+public interface ConnectionListener
{
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- * @param errorCode TODO
- * @param reason TODO
- * @param t TODO
- * @see Connection
- */
- public void onClosed(ErrorCode errorCode, String reason, Throwable t);
-} \ No newline at end of file
+
+ void opened(Connection connection);
+
+ void exception(Connection connection, ConnectionException exception);
+
+ void closed(Connection connection);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index b2be32331a..89b59c2512 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -43,25 +43,16 @@ public class Echo extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Echo();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
ioa.start();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
index 2de0c169a5..0cca0227a1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
@@ -26,7 +26,7 @@ package org.apache.qpid.transport;
*
*/
-public final class ProtocolVersionException extends TransportException
+public final class ProtocolVersionException extends ConnectionException
{
private final byte major;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
new file mode 100644
index 0000000000..c27419cadc
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.SecurityHelper;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
+
+/**
+ * ServerDelegate
+ *
+ */
+
+public class ServerDelegate extends ConnectionDelegate
+{
+
+ private SaslServer saslServer;
+
+ public void init(Channel ch, ProtocolHeader hdr)
+ {
+ Connection conn = ch.getConnection();
+ conn.send(new ProtocolHeader(1, 0, 10));
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ }
+
+ @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+ {
+ Connection conn = ch.getConnection();
+ conn.setLocale(ok.getLocale());
+ String mechanism = ok.getMechanism();
+
+ if (mechanism == null || mechanism.length() == 0)
+ {
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ return;
+ }
+
+ try
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ if (ss == null)
+ {
+ ch.connectionClose
+ (ConnectionCloseCode.CONNECTION_FORCED,
+ "null SASL mechanism: " + mechanism);
+ return;
+ }
+ conn.setSaslServer(ss);
+ secure(ch, ok.getResponse());
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ private void secure(Channel ch, byte[] response)
+ {
+ Connection conn = ch.getConnection();
+ SaslServer ss = conn.getSaslServer();
+ try
+ {
+ byte[] challenge = ss.evaluateResponse(response);
+ if (ss.isComplete())
+ {
+ ss.dispose();
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ }
+ else
+ {
+ ch.connectionSecure(challenge);
+ }
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+ {
+ secure(ch, ok.getResponse());
+ }
+
+ @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+ {
+
+ }
+
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ Connection conn = ch.getConnection();
+ ch.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.setState(OPEN);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 10ca6cfb0a..df4313e15b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -49,6 +49,26 @@ public class Session extends Invoker
private static final Logger log = Logger.get(Session.class);
+ class DefaultSessionListener implements SessionListener
+ {
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ log.info("message: %s", xfr);
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ throw exc;
+ }
+
+ public void closed(Session ssn) {}
+ }
+
+ public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
+
private static boolean ENABLE_REPLAY = false;
static
@@ -65,6 +85,7 @@ public class Session extends Invoker
}
private byte[] name;
+ private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -97,6 +118,23 @@ public class Session extends Invoker
return name;
}
+ public void setSessionListener(SessionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultSessionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public SessionListener getSessionListener()
+ {
+ return listener;
+ }
+
public void setAutoSync(boolean value)
{
synchronized (commands)
@@ -270,8 +308,8 @@ public class Session extends Invoker
{
if (closed.get())
{
- List<ExecutionException> exc = getExceptions();
- if (!exc.isEmpty())
+ ExecutionException exc = getException();
+ if (exc != null)
{
throw new SessionException(exc);
}
@@ -361,7 +399,7 @@ public class Session extends Invoker
{
if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
@@ -375,8 +413,7 @@ public class Session extends Invoker
private Map<Integer,ResultFuture<?>> results =
new HashMap<Integer,ResultFuture<?>>();
- private List<ExecutionException> exceptions =
- new ArrayList<ExecutionException>();
+ private ExecutionException exception = null;
void result(int command, Struct result)
{
@@ -388,11 +425,17 @@ public class Session extends Invoker
future.set(result);
}
- void addException(ExecutionException exc)
+ void setException(ExecutionException exc)
{
- synchronized (exceptions)
+ synchronized (results)
{
- exceptions.add(exc);
+ if (exception != null)
+ {
+ throw new IllegalStateException
+ (String.format
+ ("too many exceptions: %s, %s", exception, exc));
+ }
+ exception = exc;
}
}
@@ -403,11 +446,11 @@ public class Session extends Invoker
this.close = close;
}
- List<ExecutionException> getExceptions()
+ ExecutionException getException()
{
- synchronized (exceptions)
+ synchronized (results)
{
- return new ArrayList<ExecutionException>(exceptions);
+ return exception;
}
}
@@ -473,7 +516,7 @@ public class Session extends Invoker
}
else if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
index d2c54cf339..354e5c1d15 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -33,7 +33,7 @@ public class SessionClosedException extends SessionException
public SessionClosedException()
{
- super(Collections.EMPTY_LIST);
+ super(null);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index b91763509c..3e6fa9d5d9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -29,7 +29,7 @@ import org.apache.qpid.transport.network.Frame;
* @author Rafael H. Schloming
*/
-public abstract class SessionDelegate
+public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
@@ -57,7 +57,7 @@ public abstract class SessionDelegate
@Override public void executionException(Session ssn, ExecutionException exc)
{
- ssn.addException(exc);
+ ssn.setException(exc);
}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
@@ -122,4 +122,9 @@ public abstract class SessionDelegate
ssn.syncPoint();
}
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ssn.getSessionListener().message(ssn, xfr);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
index dc294b2206..ae9b4b9cdb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
@@ -27,20 +27,20 @@ import java.util.List;
*
*/
-public class SessionException extends RuntimeException
+public class SessionException extends TransportException
{
- private List<ExecutionException> exceptions;
+ private ExecutionException exception;
- public SessionException(List<ExecutionException> exceptions)
+ public SessionException(ExecutionException exception)
{
- super(exceptions.isEmpty() ? "" : exceptions.toString());
- this.exceptions = exceptions;
+ super(String.valueOf(exception));
+ this.exception = exception;
}
- public List<ExecutionException> getExceptions()
+ public ExecutionException getException()
{
- return exceptions;
+ return exception;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
new file mode 100644
index 0000000000..63690177f9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SessionListener
+ *
+ */
+
+public interface SessionListener
+{
+
+ void opened(Session session);
+
+ void message(Session ssn, MessageTransfer xfr);
+
+ void exception(Session session, SessionException exception);
+
+ void closed(Session session);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
index 8653acedbe..617867cae6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -103,28 +103,17 @@ public class Sink extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Sink();
}
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
System.out.println
(String.format
(FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 6886cb3a5a..8a2aba2e6d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -33,23 +33,46 @@ import org.apache.qpid.transport.Sender;
*
*/
-public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+public abstract class ConnectionBinding
+ implements Binding<Connection,ByteBuffer>
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private final ConnectionDelegate delegate;
+ public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ {
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ return connection;
+ }
+ };
+ }
- public ConnectionBinding(ConnectionDelegate delegate)
+ public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
{
- this.delegate = delegate;
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ Connection conn = new Connection();
+ conn.setConnectionDelegate(delegate);
+ return conn;
+ }
+ };
}
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ public abstract Connection connection();
+
public Connection endpoint(Sender<ByteBuffer> sender)
{
+ Connection conn = connection();
+
// XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+ Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+ conn.setSender(dis);
+ return conn;
}
public Receiver<ByteBuffer> receiver(Connection conn)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index a1fb0371fd..5efd51d5db 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -61,7 +61,7 @@ final class IoReceiver extends Thread
start();
}
- void close()
+ void close(boolean block)
{
if (!closed.getAndSet(true))
{
@@ -75,7 +75,7 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (Thread.currentThread() != this)
+ if (block && Thread.currentThread() != this)
{
join(timeout);
if (isAlive())
@@ -121,6 +121,7 @@ final class IoReceiver extends Thread
}
}
}
+ socket.close();
}
catch (Throwable t)
{
@@ -129,7 +130,6 @@ final class IoReceiver extends Thread
finally
{
receiver.closed();
- transport.getSender().close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index ef892744ab..f70a13ec3c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -196,17 +196,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
throw new TransportException("join timed out");
}
}
- transport.getReceiver().close();
- socket.close();
+ transport.getReceiver().close(false);
}
catch (InterruptedException e)
{
throw new TransportException(e);
}
- catch (IOException e)
- {
- throw new TransportException(e);
- }
if (reportException && exception != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 70fd8a3c06..be17766740 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -108,7 +108,7 @@ public final class IoTransport<E>
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
index f8dbec3c3d..b89eed48b0 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -262,13 +262,13 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding(delegate));
+ accept(host, port, ConnectionBinding.get(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
index 318fe0d03b..3bc6730623 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
@@ -87,8 +87,9 @@ public class NioHandler implements Runnable
}
NioSender sender = new NioSender(_ch);
- Connection con = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
+ Connection con = new Connection();
+ con.setSender(new Disassembler(sender, 64*1024 - 1));
+ con.setConnectionDelegate(delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
new file mode 100644
index 0000000000..e034d779ca
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.util;
+
+
+/**
+ * Waiter
+ *
+ */
+
+public final class Waiter
+{
+
+ private final Object lock;
+ private final long timeout;
+ private final long start;
+ private long elapsed;
+
+ public Waiter(Object lock, long timeout)
+ {
+ this.lock = lock;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.elapsed = 0;
+ }
+
+ public boolean hasTime()
+ {
+ return elapsed < timeout;
+ }
+
+ public void await()
+ {
+ try
+ {
+ lock.wait(timeout - elapsed);
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ elapsed = System.currentTimeMillis() - start;
+ }
+
+}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index b9ca210483..e61ffb501b 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -50,38 +50,30 @@ public class ConnectionTest extends TestCase
port = AvailablePortFinder.getNextAvailable(12000);
- ConnectionDelegate server = new ConnectionDelegate() {
- public void init(Channel ch, ProtocolHeader hdr) {
+ ConnectionDelegate server = new ServerDelegate() {
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ super.connectionOpen(ch, open);
ch.getConnection().close();
}
-
- public SessionDelegate getSessionDelegate() {
- return new SessionDelegate() {};
- }
- public void exception(Throwable t) {
- log.error(t, "exception caught");
- }
- public void closed() {}
};
IoAcceptor ioa = new IoAcceptor
- ("localhost", port, new ConnectionBinding(server));
+ ("localhost", port, ConnectionBinding.get(server));
ioa.start();
}
private Connection connect(final Condition closed)
{
- Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
+ Connection conn = new Connection();
+ conn.setConnectionListener(new ConnectionListener()
{
- public SessionDelegate getSessionDelegate()
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exc)
{
- return new SessionDelegate() {};
+ exc.printStackTrace();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed()
+ public void closed(Connection conn)
{
if (closed != null)
{
@@ -89,8 +81,7 @@ public class ConnectionTest extends TestCase
}
}
});
-
- conn.send(new ProtocolHeader(1, 0, 10));
+ conn.connect("localhost", port, null, "guest", "guest");
return conn;
}
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
index f4428cb1e2..2c36fb3d65 100644
--- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
+++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
@@ -28,140 +28,95 @@ import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.DtxSession;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid datasource.
- * Basically it is a connection pool manager used for optimizing broker connections usage.
- *
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
* @author Andrea Gazzarini
*/
-public final class QpidDatasource
+public final class QpidDatasource
{
private final static Logger LOGGER = Logger.get(QpidDatasource.class);
-
+
/**
* A connection decorator used for adding pool interaction behaviour to an existing connection.
- *
+ *
* @author Andrea Gazzarini
*/
- public class ConnectionDecorator implements Connection,ClosedListener
+ class PooledConnection extends Connection
{
- private final Connection _decoratee;
private final UUID _brokerId;
private boolean _valid;
-
+
/**
* Builds a new decorator with the given connection.
- *
+ *
* @param brokerId the broker identifier.
* @param decoratee the underlying connection.
*/
- private ConnectionDecorator(UUID brokerId, Connection decoratee)
+ private PooledConnection(UUID brokerId)
{
- this._decoratee = decoratee;
this._brokerId = brokerId;
- _decoratee.setClosedListener(this);
_valid = true;
}
-
+
/**
* Returns true if the underlying connection is still valid and can be used.
- *
+ *
* @return true if the underlying connection is still valid and can be used.
*/
boolean isValid()
{
return _valid;
}
-
+
+ void reallyClose()
+ {
+ super.close();
+ }
+
/**
* Returns the connection to the pool. That is, marks this connections as available.
* After that, this connection will be available for further operations.
*/
- public void close () throws QpidException
+ public void close()
{
try
{
pools.get(_brokerId).returnObject(this);
LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this);
- } catch (Exception exception)
+ }
+ catch (Exception e)
{
- throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception);
- }
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String host, int port, String virtualHost, String username, String password)
- throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String url) throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
- }
-
- /**
- * @see Connection#createDTXSession(int)
- */
- public DtxSession createDTXSession (int expiryInSeconds)
- {
- return _decoratee.createDTXSession(expiryInSeconds);
+ throw new ConnectionException(e);
+ }
}
- /**
- * @see Connection#createSession(long)
- */
- public Session createSession (long expiryInSeconds)
+ public void exception(Throwable t)
{
- return _decoratee.createSession(expiryInSeconds);
+ super.exception(t);
+ _valid = false;
}
+ }
- /**
- * Do nothing : closed listener has been already injected.
- */
- public void setClosedListener (ClosedListener exceptionListner)
- {
- }
-
- /**
- * Callback method used for error notifications while underlying connection is closing.
- */
- public void onClosed (ErrorCode errorCode, String reason, Throwable t)
- {
- _valid = false;
- LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode());
- }
- };
-
/**
- * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
+ * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
* the broker connection(s).
- *
+ *
* @author Andrea Gazzarini
*/
class QpidConnectionFactory extends BasePoolableObjectFactory
- {
+ {
private final BrokerConnectionData _connectionData;
private final UUID _brokerId;
-
+
/**
* Builds a new connection factory with the given parameters.
- *
+ *
* @param brokerId the broker identifier.
* @param connectionData the connecton data.
*/
@@ -170,35 +125,35 @@ public final class QpidDatasource
this._connectionData = connectionData;
this._brokerId = brokerId;
}
-
+
/**
* Creates a new underlying connection.
*/
@Override
public Connection makeObject () throws Exception
{
- Connection connection = Client.createConnection();
+ PooledConnection connection = new PooledConnection(_brokerId);
connection.connect(
- _connectionData.getHost(),
- _connectionData.getPort(),
- _connectionData.getVirtualHost(),
- _connectionData.getUsername(),
+ _connectionData.getHost(),
+ _connectionData.getPort(),
+ _connectionData.getVirtualHost(),
+ _connectionData.getUsername(),
_connectionData.getPassword());
- return new ConnectionDecorator(_brokerId,connection);
+ return connection;
}
-
+
/**
* Validates the underlying connection.
*/
@Override
public boolean validateObject (Object obj)
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
+ PooledConnection connection = (PooledConnection) obj;
boolean isValid = connection.isValid();
LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid);
return isValid;
}
-
+
/**
* Closes the underlying connection.
*/
@@ -207,8 +162,8 @@ public final class QpidDatasource
{
try
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
- connection._decoratee.close();
+ PooledConnection connection = (PooledConnection) obj;
+ connection.reallyClose();
LOGGER.debug("<QMAN-200014> : Connection has been destroyed.");
} catch (Exception e)
{
@@ -216,21 +171,21 @@ public final class QpidDatasource
}
}
}
-
+
// Singleton instance.
private static QpidDatasource instance = new QpidDatasource();
// Each entry contains a connection pool for a specific broker.
private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
-
+
// Private constructor.
private QpidDatasource()
{
}
-
+
/**
* Gets an available connection from the pool of the given broker.
- *
+ *
* @param brokerId the broker identifier.
* @return a valid connection to the broker associated with the given identifier.
*/
@@ -238,20 +193,20 @@ public final class QpidDatasource
{
return (Connection) pools.get(brokerId).borrowObject();
}
-
+
/**
* Entry point method for retrieving the singleton instance of this datasource.
- *
+ *
* @return the qpid datasource singleton instance.
*/
- public static QpidDatasource getInstance()
+ public static QpidDatasource getInstance()
{
return instance;
}
-
+
/**
* Adds a connection pool to this datasource.
- *
+ *
* @param brokerId the broker identifier that will be associated with the new connection pool.
* @param connectionData the broker connection data.
* @throws Exception when the pool cannot be created.
@@ -265,12 +220,12 @@ public final class QpidDatasource
true,
false);
ObjectPool pool = factory.createPool();
-
+
for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++)
{
pool.returnObject(pool.borrowObject());
}
-
+
pools.put(brokerId,pool);
}
} \ No newline at end of file
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
index 92689eba52..4bda450315 100644
--- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
+++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
@@ -22,42 +22,47 @@ package org.apache.qpid.management.domain.services;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.QpidException;
import org.apache.qpid.management.Constants;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.configuration.QpidDatasource;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid Broker facade.
- *
+ *
* @author Andrea Gazzarini
*/
-public class QpidService
+public class QpidService implements SessionListener
{
private final static Logger LOGGER = Logger.get(QpidService.class);
// Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication.
- private static class Log
- {
+ private static class Log
+ {
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(byte [] messageContent)
+ static void logMessageContent(byte [] messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -65,56 +70,81 @@ public class QpidService
Arrays.toString(messageContent));
}
}
-
+
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(ByteBuffer messageContent)
+ static void logMessageContent(ByteBuffer messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"<QMAN-200002> : Message has been sent to management exchange.");
}
- }
+ }
}
-
+
private UUID _brokerId;
private Connection _connection;
private Session _session;
-
+ private Map<String,MessagePartListenerAdapter> _listeners;
+
/**
* Builds a new service with the given connection data.
- *
+ *
* @param connectionData the connection data of the broker.
*/
- public QpidService(UUID brokerId)
+ public QpidService(UUID brokerId)
{
this._brokerId = brokerId;
}
-
+
/**
* Estabilishes a connection with the broker.
- *
+ *
* @throws QpidException in case of connection failure.
*/
public void connect() throws Exception
{
_connection = QpidDatasource.getInstance().getConnection(_brokerId);
+ _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
_session = _connection.createSession(Constants.NO_EXPIRATION);
+ _session.setSessionListener(this);
+ }
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+ if (l == null)
+ {
+ LOGGER.error("unhandled message: %s", xfr);
+ }
+ else
+ {
+ l.messageTransfer(xfr);
+ }
}
-
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ LOGGER.error(exc, "session %s exception", ssn);
+ }
+
+ public void closed(Session ssn) {}
+
/**
- * All the previously entered outstanding commands are asynchronous.
+ * All the previously entered outstanding commands are asynchronous.
* Synchronous behavior is achieved through invoking this method.
*/
- public void sync()
+ public void sync()
{
_session.sync();
}
-
+
/**
* Closes communication with broker.
*/
@@ -124,48 +154,50 @@ public class QpidService
{
_session.close();
_session = null;
+ _listeners = null;
} catch (Exception e)
{
}
try
{
- _connection.close();
+ _connection.close();
_connection = null;
} catch (Exception e)
{
}
}
-
+
/**
* Associate a message listener with a destination therefore creating a new subscription.
- *
+ *
* @param queueName The name of the queue that the subscriber is receiving messages from.
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
- * @param listener the listener for this destination.
- *
+ * @param listener the listener for this destination.
+ *
* @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
*/
- public void createSubscription(String queueName, String destinationName,MessageListener listener)
+ public void createSubscription(String queueName, String destinationName, MessageListener listener)
{
- _session.messageSubscribe(
- queueName,
- destinationName,
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
-
- _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
- _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE);
-
+ _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+ _session.messageSubscribe
+ (queueName,
+ destinationName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+
+ _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+ _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
LOGGER.debug(
- "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
+ "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
queueName,
destinationName);
}
-
+
/**
* Removes a previously declared consumer from the broker.
- *
+ *
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
* @see Session#messageCancel(String, Option...)
*/
@@ -173,10 +205,10 @@ public class QpidService
{
_session.messageCancel(destinationName);
LOGGER.debug(
- "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
+ "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
destinationName);
- }
-
+ }
+
/**
* Declares a queue on the broker with the given name.
*
@@ -200,27 +232,27 @@ public class QpidService
_session.queueDelete(queueName);
LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName);
}
-
+
/**
* Binds (on the broker) a queue with an exchange.
*
- * @param queueName the name of the queue to bind.
+ * @param queueName the name of the queue to bind.
* @param exchangeName the exchange name.
- * @param routingKey the routing key used for the binding.
+ * @param routingKey the routing key used for the binding.
* @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
*/
public void declareBinding(String queueName, String exchangeName, String routingKey)
{
_session.exchangeBind(queueName, exchangeName, routingKey, null);
LOGGER.debug(
- "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
+ "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Removes a previously declare binding between an exchange and a queue.
- *
+ *
* @param queueName the name of the queue.
* @param exchangeName the name of the exchange.
* @param routingKey the routing key used for binding.
@@ -229,42 +261,42 @@ public class QpidService
{
_session.exchangeUnbind(queueName, exchangeName, routingKey);
LOGGER.debug(
- "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
+ "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(byte [] messageData)
+ public void sendCommandMessage(byte [] messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(ByteBuffer messageData)
+ public void sendCommandMessage(ByteBuffer messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
- }
+ }
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
index 181cf427d1..82e05ba816 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -640,71 +640,47 @@ public class QpidBench
}
private static final org.apache.qpid.transport.Connection getConnection
- (Options opts, final SessionDelegate delegate)
+ (Options opts)
{
- final Object lock = new Object();
org.apache.qpid.transport.Connection conn =
- IoTransport.connect(opts.broker, opts.port,
- new ClientDelegate()
- {
- public SessionDelegate getSessionDelegate()
- {
- return delegate;
- }
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
- @Override public void connectionOpenOk(Channel ch,
- ConnectionOpenOk ok)
- {
- synchronized (lock)
- {
- lock.notify();
- }
- }
- });
- conn.send(new ProtocolHeader(1, 0, 10));
-
- synchronized (lock)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ new org.apache.qpid.transport.Connection();
+ conn.connect(opts.broker, opts.port, null, "guest", "guest");
+ return conn;
+ }
+
+ private static abstract class NativeListener implements SessionListener
+ {
+
+ public void opened(org.apache.qpid.transport.Session ssn) {}
+
+ public void exception(org.apache.qpid.transport.Session ssn,
+ SessionException exc)
+ {
+ exc.printStackTrace();
}
- return conn;
+ public void closed(org.apache.qpid.transport.Session ssn) {}
+
}
private static final void native_publisher(Options opts) throws Exception
{
final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(mt);
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ synchronized (echos)
+ {
+ echos[0]++;
+ echos.notify();
+ }
+ ssn.processed(xfr);
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -794,6 +770,7 @@ public class QpidBench
ssn.messageCancel("echo-queue");
ssn.sync();
+ ssn.close();
conn.close();
}
@@ -805,57 +782,51 @@ public class QpidBench
dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
final MessageProperties mp = new MessageProperties();
final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
-
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(mt);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ ssn.messageTransfer("amq.direct",
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp),
+ echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ ssn.processed(xfr);
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -879,6 +850,7 @@ public class QpidBench
ssn.messageCancel("test-queue");
ssn.sync();
+ ssn.close();
conn.close();
}