diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-06-01 20:40:34 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-06-01 20:40:34 +0000 |
commit | 42238d6f0a49bd9311229752c07278329b90e05c (patch) | |
tree | 3b6cc2bdc6f7b815cef5391bbb333e68025dcda0 | |
parent | c21fe3e31285fe01b0f14f3d5b2c920edd6b2546 (diff) | |
download | qpid-python-42238d6f0a49bd9311229752c07278329b90e05c.tar.gz |
more enchancements for the Qpid java client. Also I have checked in a sample client(QpidTestClient) on how to use the qpid java client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@543602 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 65 insertions, 29 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java index 16e79ca0de..bde2a7bfea 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java @@ -1,12 +1,16 @@ package org.apache.qpid.nclient.amqp.sample; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.nclient.api.QpidConnection; import org.apache.qpid.nclient.api.QpidConstants; import org.apache.qpid.nclient.api.QpidExchangeHelper; +import org.apache.qpid.nclient.api.QpidMessageConsumer; import org.apache.qpid.nclient.api.QpidMessageProducer; import org.apache.qpid.nclient.api.QpidQueueHelper; import org.apache.qpid.nclient.api.QpidSession; import org.apache.qpid.nclient.impl.QpidConnectionImpl; +import org.apache.qpid.nclient.message.AMQPApplicationMessage; +import org.apache.qpid.nclient.message.MessageHeaders; public class QpidTestClient { @@ -21,20 +25,32 @@ public class QpidTestClient session.open(); QpidExchangeHelper exchangeHelper = session.getExchangeHelper(); - exchangeHelper.open(); exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS); QpidQueueHelper queueHelper = session.getQueueHelper(); - queueHelper.open(); queueHelper.declareQueue(false, false, false, false, false, "myQueue"); queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH"); + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setRoutingKey(new AMQShortString("RH")); + msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME)); + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes()); + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + messageProducer.send(false, true, msg); + + QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false); + messageConsumer.open(); + + AMQPApplicationMessage msg2 = messageConsumer.receive(); + System.out.println(msg.toString()); } catch(Exception e) { - + e.printStackTrace(); } } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java index d0a4ab79a5..b2afe1f35c 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/AbstractResource.java @@ -7,12 +7,18 @@ import org.apache.qpid.nclient.core.AMQPException; * This abstracts the error handling for open * and close methods for a resource. This class * eliminates the duplication of error handling - * code + * code. + * + * This is not thread safe and is only to be used + * by a single thread at a time. Session and Connection + * have overriden key methods to provide thread safety. + * */ public abstract class AbstractResource { private String _resourceName; private boolean _closed = true; + private boolean _opened = false; public AbstractResource(String resourceName) { @@ -21,31 +27,36 @@ public abstract class AbstractResource public void open() throws QpidException { - _closed = false; - try - { - openResource(); - - } - catch(Exception e) - { - throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e); + if(!_opened) + { + try + { + openResource(); + _opened = true; + _closed = false; + } + catch(Exception e) + { + throw new QpidException("Error creating " + _resourceName + " due to " + e.getMessage(),e); + } } } public void close() throws QpidException { - _closed = true; - try - { - closeResource(); - - } - catch(Exception e) - { - throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e); + if(!_closed) + { + try + { + closeResource(); + _closed = true; + _opened = false; + } + catch(Exception e) + { + throw new QpidException("Error destroying " + _resourceName + " due to " + e.getMessage(),e); + } } - } protected abstract void openResource() throws AMQPException, QpidException; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java index e9809e4c83..9bbc306f36 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidConnectionImpl.java @@ -57,6 +57,7 @@ import org.apache.qpid.nclient.api.QpidConnection; import org.apache.qpid.nclient.api.QpidException; import org.apache.qpid.nclient.api.QpidSession; import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; import org.apache.qpid.nclient.transport.ConnectionURL; import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; @@ -89,9 +90,9 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti private Lock _lock = new ReentrantLock(); - private AtomicBoolean _closed; + private AtomicBoolean _closed = new AtomicBoolean(true); - private AtomicBoolean _opened; + private AtomicBoolean _opened = new AtomicBoolean(false); public QpidConnectionImpl() { @@ -164,8 +165,8 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti try { - //_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); - _amqpConnection = _classFactory.createConnectionClass(url, ConnectionType.TCP); + _url = new AMQPConnectionURL(url); + _amqpConnection = _classFactory.createConnectionClass(_url, ConnectionType.TCP); } catch(Exception e) { @@ -181,6 +182,7 @@ public class QpidConnectionImpl extends AbstractResource implements QpidConnecti throw new QpidException("Connection negotiation failed due to " + e.getMessage(),e); } + _closed.set(false); _opened.set(true); } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java index c7c62c24d8..e4d4bce029 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidSessionImpl.java @@ -66,8 +66,8 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession private QpidQueueHelperImpl _qpidQueueHelper; private QpidMessageHelperImpl _qpidMessageHelper; private List<QpidMessageProducerImpl> _producers = new ArrayList<QpidMessageProducerImpl>(); - private AtomicBoolean _closed; - private AtomicInteger _consumerTag; + private AtomicBoolean _closed = new AtomicBoolean(true); + private AtomicInteger _consumerTag = new AtomicInteger(); private Lock _sessionCloseLock = new ReentrantLock(); // this will be used as soon as Session class is finalized @@ -92,6 +92,7 @@ public class QpidSessionImpl extends AbstractResource implements QpidSession */ protected void openResource() throws AMQPException, QpidException { + _closed.set(false); // These methods will be changed to session methods openChannel(); diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java index 29e61f623f..2851288d3f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -35,6 +35,12 @@ public class AMQPApplicationMessage { private boolean redeliveredFlag; private MessageHeaders messageHeaders; + public AMQPApplicationMessage(MessageHeaders messageHeaders,byte[] content) + { + this.messageHeaders = messageHeaders; + addContent(content); + } + public AMQPApplicationMessage(int channelId, byte[] referenceId) { this.channelId = channelId; |