summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-06-01 20:40:34 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-06-01 20:40:34 +0000
commit42238d6f0a49bd9311229752c07278329b90e05c (patch)
tree3b6cc2bdc6f7b815cef5391bbb333e68025dcda0
parentc21fe3e31285fe01b0f14f3d5b2c920edd6b2546 (diff)
downloadqpid-python-42238d6f0a49bd9311229752c07278329b90e05c.tar.gz
more enchancements for the Qpid java client. Also I have checked in a sample client(QpidTestClient) on how to use the qpid java client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@543602 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java22
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java51
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java10
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java5
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java6
5 files changed, 65 insertions, 29 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
index 16e79ca0de..bde2a7bfea 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
@@ -1,12 +1,16 @@
package org.apache.qpid.nclient.amqp.sample;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.nclient.api.QpidConnection;
import org.apache.qpid.nclient.api.QpidConstants;
import org.apache.qpid.nclient.api.QpidExchangeHelper;
+import org.apache.qpid.nclient.api.QpidMessageConsumer;
import org.apache.qpid.nclient.api.QpidMessageProducer;
import org.apache.qpid.nclient.api.QpidQueueHelper;
import org.apache.qpid.nclient.api.QpidSession;
import org.apache.qpid.nclient.impl.QpidConnectionImpl;
+import org.apache.qpid.nclient.message.AMQPApplicationMessage;
+import org.apache.qpid.nclient.message.MessageHeaders;
public class QpidTestClient
{
@@ -21,20 +25,32 @@ public class QpidTestClient
session.open();
QpidExchangeHelper exchangeHelper = session.getExchangeHelper();
- exchangeHelper.open();
exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS);
QpidQueueHelper queueHelper = session.getQueueHelper();
- queueHelper.open();
queueHelper.declareQueue(false, false, false, false, false, "myQueue");
queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH");
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setRoutingKey(new AMQShortString("RH"));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes());
+
QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ messageProducer.send(false, true, msg);
+
+ QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false);
+ messageConsumer.open();
+
+ AMQPApplicationMessage msg2 = messageConsumer.receive();
+ System.out.println(msg.toString());
}
catch(Exception e)
{
-
+ e.printStackTrace();
}
}
+
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
index d0a4ab79a5..b2afe1f35c 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java
@@ -7,12 +7,18 @@ import org.apache.qpid.nclient.core.AMQPException;
* This abstracts the error handling for open
* and close methods for a resource. This class
* eliminates the duplication of error handling
- * code
+ * code.
+ *
+ * This is not thread safe and is only to be used
+ * by a single thread at a time. Session and Connection
+ * have overriden key methods to provide thread safety.
+ *
*/
public abstract class AbstractResource
{
private String _resourceName;
private boolean _closed = true;
+ private boolean _opened = false;
public AbstractResource(String resourceName)
{
@@ -21,31 +27,36 @@ public abstract class AbstractResource
public void open() throws QpidException
{
- _closed = false;
- try
- {
- openResource();
-
- }
- catch(Exception e)
- {
- throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e);
+ if(!_opened)
+ {
+ try
+ {
+ openResource();
+ _opened = true;
+ _closed = false;
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e);
+ }
}
}
public void close() throws QpidException
{
- _closed = true;
- try
- {
- closeResource();
-
- }
- catch(Exception e)
- {
- throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e);
+ if(!_closed)
+ {
+ try
+ {
+ closeResource();
+ _closed = true;
+ _opened = false;
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e);
+ }
}
-
}
protected abstract void openResource() throws AMQPException, QpidException;
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
index e9809e4c83..9bbc306f36 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java
@@ -57,6 +57,7 @@ import org.apache.qpid.nclient.api.QpidConnection;
import org.apache.qpid.nclient.api.QpidException;
import org.apache.qpid.nclient.api.QpidSession;
import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -89,9 +90,9 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti
private Lock _lock = new ReentrantLock();
- private AtomicBoolean _closed;
+ private AtomicBoolean _closed = new AtomicBoolean(true);
- private AtomicBoolean _opened;
+ private AtomicBoolean _opened = new AtomicBoolean(false);
public QpidConnectionImpl()
{
@@ -164,8 +165,8 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti
try
{
- //_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
- _amqpConnection = _classFactory.createConnectionClass(url, ConnectionType.TCP);
+ _url = new AMQPConnectionURL(url);
+ _amqpConnection = _classFactory.createConnectionClass(_url, ConnectionType.TCP);
}
catch(Exception e)
{
@@ -181,6 +182,7 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti
throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e);
}
+ _closed.set(false);
_opened.set(true);
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
index c7c62c24d8..e4d4bce029 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java
@@ -66,8 +66,8 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
private QpidQueueHelperImpl _qpidQueueHelper;
private QpidMessageHelperImpl _qpidMessageHelper;
private List<QpidMessageProducerImpl> _producers = new ArrayList<QpidMessageProducerImpl>();
- private AtomicBoolean _closed;
- private AtomicInteger _consumerTag;
+ private AtomicBoolean _closed = new AtomicBoolean(true);
+ private AtomicInteger _consumerTag = new AtomicInteger();
private Lock _sessionCloseLock = new ReentrantLock();
// this will be used as soon as Session class is finalized
@@ -92,6 +92,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession
*/
protected void openResource() throws AMQPException, QpidException
{
+ _closed.set(false);
// These methods will be changed to session methods
openChannel();
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
index 29e61f623f..2851288d3f 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -35,6 +35,12 @@ public class AMQPApplicationMessage {
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
+ public AMQPApplicationMessage(MessageHeaders messageHeaders,byte[] content)
+ {
+ this.messageHeaders = messageHeaders;
+ addContent(content);
+ }
+
public AMQPApplicationMessage(int channelId, byte[] referenceId)
{
this.channelId = channelId;