diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-17 12:46:01 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-17 12:46:01 +0000 |
commit | 3e009cb10063457115db198b8faea1b395365b89 (patch) | |
tree | ae70e022fbdf3f511fe0931d4dbd2c1da7f2e6e8 | |
parent | ec0dbb9127997b060fd0a631d96a3b27c5653d33 (diff) | |
download | qpid-python-3e009cb10063457115db198b8faea1b395365b89.tar.gz |
added jndi
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@567048 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 1126 insertions, 121 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index 25bebb4ae5..5af685071d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -14,6 +14,7 @@ import org.apache.qpidity.ErrorCode; import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.url.QpidURL; import org.apache.qpidity.client.impl.ClientSession; import org.apache.qpidity.client.impl.ClientSessionDelegate; @@ -83,7 +84,7 @@ public class Client implements org.apache.qpidity.client.Connection * Until the dust settles with the URL disucssion * I am not going to implement this. */ - public void connect(URL url) throws QpidException + public void connect(QpidURL url) throws QpidException { throw new UnsupportedOperationException(); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 9bc17b14a6..c7624eec6a 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -18,11 +18,8 @@ */ package org.apache.qpidity.client; - -import java.net.URL; -import java.util.UUID; - import org.apache.qpidity.QpidException; +import org.apache.qpidity.url.QpidURL; /** * This represents a physical connection to a broker. @@ -46,7 +43,7 @@ public interface Connection * @param url The URL of the broker. * @throws QpidException If the communication layer fails to connect with the broker. */ - public void connect(URL url) throws QpidException; + public void connect(QpidURL url) throws QpidException; /** * Close this connection. diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java index 2c8ce94e27..301c93c5ac 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java @@ -1,18 +1,35 @@ package org.apache.qpidity.jms; import javax.jms.*; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; +import javax.naming.*; +import javax.naming.spi.ObjectFactory; import org.apache.qpidity.QpidException; +import org.apache.qpidity.url.QpidURLImpl; +import org.apache.qpidity.url.QpidURL; +import org.apache.qpidity.url.BindingURLImpl; +import org.apache.qpidity.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Hashtable; + +/** + * Implements all the JMS connection factories. + * <p> In all the implementations in our code base + * when we create a Reference we pass in <code>ConnectionFactoryImpl</code> as the + * factory for creating the objects. This is the factory (or + * {@link ObjectFactory}) that is used to turn the description in to a real object. + * <p>In our construction of the Reference the last param. is null, + * we could put a url to a jar that contains our {@link ObjectFactory} so that + * any of our objects stored in JNDI can be recreated without even having + * the classes locally. As it is the <code>ConnectionFactoryImpl</code> must be on the + * classpath when you do a lookup in a JNDI context.. else you'll get a + * ClassNotFoundEx. + */ public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory, - Referenceable + ObjectFactory, Referenceable { /** * this ConnectionFactoryImpl's logger @@ -42,16 +59,21 @@ public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnection /** * The URL used to build this factory, (not yet supported) */ - private String _url; + private QpidURL _qpidURL; // Undefined at the moment - public ConnectionFactoryImpl(String url) + public ConnectionFactoryImpl(QpidURL url) + { + _qpidURL = url; + } + + public ConnectionFactoryImpl(String url) throws URLSyntaxException { - _url = url; + // todo } /** - * Create a connection. + * Create a connection Factory * * @param host The broker host name. * @param port The port on which the broker is listening for connection. @@ -409,10 +431,74 @@ public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnection // Support for JNDI // ---------------------------------------- + /** + * Creates an object using the location or reference information + * specified. + * + * @param obj The possibly null object containing location or reference + * information that can be used in creating an object. + * @param name The name of this object relative to <code>nameCtx</code>, + * or null if no name is specified. + * @param nameCtx The context relative to which the <code>name</code> + * parameter is specified, or null if <code>name</code> is + * relative to the default initial context. + * @param environment The possibly null environment that is used in + * creating the object. + * @return The object created; null if an object cannot be created. + * @throws Exception if this object factory encountered an exception + * while attempting to create an object, and no other object factories are + * to be tried. + */ + public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable environment) throws Exception + { + if (obj instanceof Reference) + { + Reference ref = (Reference) obj; + + if (ref.getClassName().equals(QueueImpl.class.getName())) + { + RefAddr addr = ref.get(QueueImpl.class.getName()); + + if (addr != null) + { + return new QueueImpl(new BindingURLImpl((String) addr.getContent())); + } + } + + if (ref.getClassName().equals(TopicImpl.class.getName())) + { + RefAddr addr = ref.get(TopicImpl.class.getName()); + + if (addr != null) + { + return new TopicImpl(new BindingURLImpl((String) addr.getContent())); + } + } + + if (ref.getClassName().equals(ConnectionFactoryImpl.class.getName())) + { + RefAddr addr = ref.get(ConnectionFactoryImpl.class.getName()); + if (addr != null) + { + return new ConnectionFactoryImpl(new QpidURLImpl((String) addr.getContent())); + } + } + + } + return null; + } + + //-- interface Reference + /** + * Retrieves the Reference of this object. + * + * @return The non-null Reference of this object. + * @throws NamingException If a naming exception was encountered while retrieving the reference. + */ public Reference getReference() throws NamingException { return new Reference(ConnectionFactoryImpl.class.getName(), - new StringRefAddr(ConnectionFactoryImpl.class.getName(), _url)); + new StringRefAddr(ConnectionFactoryImpl.class.getName(), _qpidURL.getURL()), + ConnectionFactoryImpl.class.getName(), null); } - } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 60dc126dcf..1a7d5f7402 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -27,19 +27,14 @@ import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Queue; -import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.Topic; -import javax.jms.TopicConnection; import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; import org.apache.qpidity.QpidException; +import org.apache.qpidity.url.QpidURL; import org.apache.qpidity.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * Implements javax.jms.Connection, javax.jms.QueueConnection and javax.jms.TopicConnection */ -public class ConnectionImpl implements Connection, Referenceable +public class ConnectionImpl implements Connection { /** * This class's logger @@ -66,16 +61,6 @@ public class ConnectionImpl implements Connection, Referenceable private String _clientID; /** - * The user name to use for authentication - */ - private String _username; - - /** - * The password to use for authentication - */ - private String _password; - - /** * The Exception listenr get informed when a serious problem is detected */ private ExceptionListener _exceptionListener; @@ -128,8 +113,19 @@ public class ConnectionImpl implements Connection, Referenceable _qpidConnection.connect(host, port, virtualHost, username, password); } - //---- Interface javax.jms.Connection ---// + /** + * Create a connection from a QpidURL + * + * @param qpidURL The url used to create this connection + * @throws QpidException If creating a connection fails due to some internal error. + */ + protected ConnectionImpl(QpidURL qpidURL) throws QpidException + { + _qpidConnection = Client.createConnection(); + _qpidConnection.connect(qpidURL); + } + //---- Interface javax.jms.Connection ---// /** * Creates a Session * @@ -142,7 +138,7 @@ public class ConnectionImpl implements Connection, Referenceable public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - SessionImpl session = null; + SessionImpl session; try { session = new SessionImpl(this, transacted, acknowledgeMode, false); @@ -174,12 +170,11 @@ public class ConnectionImpl implements Connection, Referenceable /** * Sets the client identifier for this connection. - * <p/> * <P>The preferred way to assign a JMS client's client identifier is for * it to be configured in a client-specific <CODE>ConnectionFactory</CODE> * object and transparently assigned to the <CODE>Connection</CODE> object * it creates. - * <p> In AMQP it is not possible to change the client ID. If one is not specified + * <p> In Qpid it is not possible to change the client ID. If one is not specified * upon connection construction, an id is generated automatically. Therefore * we can always throw an exception. * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE> @@ -396,7 +391,7 @@ public class ConnectionImpl implements Connection, Referenceable public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - QueueSessionImpl queueSession = null; + QueueSessionImpl queueSession; try { queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); @@ -437,12 +432,11 @@ public class ConnectionImpl implements Connection, Referenceable * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. * @return a newly created topic session * @throws JMSException If creating the session fails due to some internal error. - * @throws QpidException */ public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - TopicSessionImpl session = null; + TopicSessionImpl session; try { session = new TopicSessionImpl(this, transacted, acknowledgeMode); @@ -476,7 +470,6 @@ public class ConnectionImpl implements Connection, Referenceable } //-------------- protected and private methods - /** * Validate that the Connection is not closed. * <p/> @@ -507,9 +500,4 @@ public class ConnectionImpl implements Connection, Referenceable { return _qpidConnection; } - - public Reference getReference() throws NamingException - { - return new Reference(ConnectionImpl.class.getName(), new StringRefAddr(ConnectionImpl.class.getName(), "")); - } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java index df585ab50e..75d0aaad05 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java @@ -19,13 +19,20 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; import org.apache.qpidity.url.BindingURL; +import org.apache.qpidity.url.BindingURLImpl; +import org.apache.qpidity.url.URLSyntaxException; +import org.apache.qpid.url.URLHelper; import javax.jms.Destination; +import javax.naming.Reference; +import javax.naming.NamingException; +import javax.naming.StringRefAddr; +import javax.naming.Referenceable; /** * Implementation of the JMS Destination interface */ -public class DestinationImpl implements Destination +public class DestinationImpl implements Destination, Referenceable { /** * The destination's name @@ -33,11 +40,6 @@ public class DestinationImpl implements Destination protected String _destinationName = null; /** - * The session used to create this destination - */ - protected SessionImpl _session; - - /** * The excahnge name */ protected String _exchangeName; @@ -67,28 +69,26 @@ public class DestinationImpl implements Destination */ protected boolean _isDurable; - //--- Constructor /** - * Create a new DestinationImpl. - * - * @param session The session used to create this DestinationImpl. + * The biding URL used to create this destiantion */ - protected DestinationImpl(SessionImpl session) + protected BindingURL _url; + + //--- Constructor + + protected DestinationImpl(String name) throws QpidException { - _session = session; + _queueName = name; } - /** * Create a destiantion from a binding URL * - * @param session The session used to create this queue. * @param binding The URL * @throws QpidException If the URL is not valid */ - protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException + public DestinationImpl(BindingURL binding) throws QpidException { - _session = session; _exchangeName = binding.getExchangeName(); _exchangeType = binding.getExchangeClass(); _destinationName = binding.getDestinationName(); @@ -96,6 +96,7 @@ public class DestinationImpl implements Destination _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName(); + _url = binding; } //---- Getters and Setters @@ -119,15 +120,6 @@ public class DestinationImpl implements Destination return _destinationName; } - /** - * Get the session of this destination - * - * @return The session of this destination - */ - public SessionImpl getSession() - { - return _session; - } /** * The exchange name @@ -188,5 +180,71 @@ public class DestinationImpl implements Destination { return _isDurable; } + + //----- Interface Referenceable + public Reference getReference() throws NamingException + { + return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()), + ConnectionFactoryImpl.class.getName(), // factory + null); // factory location + } + + //--- non public method s + + /** + * Get the URL used to create this destiantion + * + * @return The URL used to create this destiantion + */ + public String toURL() + { + if (_url == null) + { + StringBuffer sb = new StringBuffer(); + sb.append(_exchangeType); + sb.append("://"); + sb.append(_exchangeName); + sb.append('/'); + if (_destinationName != null) + { + sb.append(_destinationName); + } + sb.append('/'); + if (_queueName != null) + { + sb.append(_queueName); + } + sb.append('?'); + if (_isDurable) + { + sb.append(org.apache.qpid.url.BindingURL.OPTION_DURABLE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + if (_isExclusive) + { + sb.append(org.apache.qpid.url.BindingURL.OPTION_EXCLUSIVE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + if (_isAutoDelete) + { + sb.append(org.apache.qpid.url.BindingURL.OPTION_AUTODELETE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + //removeKey the last char '?' if there is no options , ',' if there are. + sb.deleteCharAt(sb.length() - 1); + try + { + _url = new BindingURLImpl(sb.toString()); + } + catch (URLSyntaxException e) + { + // this should not happen. + } + } + return _url.getURL(); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index b9e1d5a9db..9ed74e1cd0 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -30,19 +30,7 @@ import javax.jms.JMSException; */ public class QueueImpl extends DestinationImpl implements Queue { - - //--- Constructor - - /** - * Create a new QueueImpl. - * - * @param session The session used to create this QueueImpl. - */ - protected QueueImpl(SessionImpl session) - { - super(session); - } - + //--- Constructor /** * Create a new QueueImpl with a given name. * @@ -52,7 +40,7 @@ public class QueueImpl extends DestinationImpl implements Queue */ protected QueueImpl(SessionImpl session, String name) throws QpidException { - super(session); + super(name); _queueName = name; _destinationName = name; _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; @@ -60,7 +48,7 @@ public class QueueImpl extends DestinationImpl implements Queue _isAutoDelete = false; _isDurable = true; _isExclusive = false; - registerQueue(false); + registerQueue(session, false); } /** @@ -72,8 +60,37 @@ public class QueueImpl extends DestinationImpl implements Queue */ protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException { - super(session, binding); - registerQueue(false); + super(binding); + registerQueue(session, false); + } + + /** + * Create a destiantion from a binding URL + * + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + public QueueImpl(BindingURL binding) throws QpidException + { + super(binding); + } + + /** + * Create a new QueueImpl with a given name. + * + * @param name The name of this queue. + * @throws QpidException If the queue name is not valid + */ + public QueueImpl(String name) throws QpidException + { + super(name); + _queueName = name; + _destinationName = name; + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + _isAutoDelete = false; + _isDurable = true; + _isExclusive = false; } //---- Interface javax.jms.Queue @@ -91,27 +108,28 @@ public class QueueImpl extends DestinationImpl implements Queue /** * Check that this queue exists and declare it if required. * + * @param session The session used to create this destination * @param declare Specify whether the queue should be declared * @throws QpidException If this queue does not exists on the broker. */ - protected void registerQueue(boolean declare) throws QpidException + protected void registerQueue(SessionImpl session, boolean declare) throws QpidException { // test if this exchange exist on the broker //todo we can also specify if the excahnge is autodlete and durable - _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); + session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); // wait for the broker response - _session.getQpidSession().sync(); + session.getQpidSession().sync(); // If this exchange does not exist then we will get an Expection 404 does notexist //todo check for an execption // now check if the queue exists - _session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION, - _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, - _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION, - declare ? Option.PASSIVE : Option.NO_OPTION); + session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION, + _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, + _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION, + declare ? Option.PASSIVE : Option.NO_OPTION); // wait for the broker response - _session.getQpidSession().sync(); + session.getQpidSession().sync(); // If this queue does not exist then we will get an Expection 404 does notexist - _session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null); + session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null); // we don't have to sync as we don't expect an error } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java index 22e28f5c42..a152f66da7 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java @@ -33,6 +33,11 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem */ private boolean _isDeleted; + /** + * The session used to create this destination + */ + private SessionImpl _session; + //--- constructor /** * Create a new TemporaryQueueImpl. @@ -42,16 +47,15 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem */ protected TemporaryQueueImpl(SessionImpl session) throws QpidException { + super("TempQueue-" + UUID.randomUUID()); // temporary destinations do not have names - super(session); - _queueName = "TempQueue-" + UUID.randomUUID(); - _destinationName = _queueName; _isAutoDelete = false; _isDurable = false; _isExclusive = false; _isDeleted = false; + _session = session; // we must create this queue - registerQueue(true); + registerQueue(session, true); } //-- TemporaryDestination Interface diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java index 092ee15b2f..8fa139919d 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java @@ -34,8 +34,13 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem */ private boolean _isDeleted = false; + /** + * The session used to create this destination + */ + private SessionImpl _session; + //--- constructor - /** + /** * Create a new TemporaryTopicImpl with a given name. * * @param session The session used to create this TemporaryTopicImpl. @@ -45,6 +50,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem { // temporary destinations do not have names. super(session, "TemporayTopic-" + UUID.randomUUID()); + _session = session; } //-- TemporaryDestination Interface @@ -56,7 +62,10 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem //-- TemporaryTopic Interface public void delete() throws JMSException { - // todo: delete this destinaiton + if (!_isDeleted) + { + _session.getQpidSession().queueDelete(_queueName); + } _isDeleted = true; } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index 4fc0a28ecb..199e882234 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -38,9 +38,28 @@ public class TopicImpl extends DestinationImpl implements Topic * @param session The session used to create this queue. * @throws QpidException If the topic name is not valid */ - public TopicImpl(SessionImpl session, String name) throws QpidException + protected TopicImpl(SessionImpl session, String name) throws QpidException { - super(session); + super(name); + _queueName = "Topic-" + UUID.randomUUID(); + _destinationName = name; + _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + _isAutoDelete = true; + _isDurable = false; + _isExclusive = true; + checkTopicExists(session); + } + + /** + * Create a new TopicImpl with a given name. + * + * @param name The name of this topic + * @throws QpidException If the topic name is not valid + */ + public TopicImpl(String name) throws QpidException + { + super(name); _queueName = "Topic-" + UUID.randomUUID(); _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; @@ -48,7 +67,6 @@ public class TopicImpl extends DestinationImpl implements Topic _isAutoDelete = true; _isDurable = false; _isExclusive = true; - checkTopicExists(); } /** @@ -60,8 +78,20 @@ public class TopicImpl extends DestinationImpl implements Topic */ protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException { - super(session, binding); - checkTopicExists(); + super(binding); + checkTopicExists(session); + } + + + /** + * Create a TopicImpl from a binding URL + * + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + public TopicImpl(BindingURL binding) throws QpidException + { + super(binding); } //--- javax.jsm.Topic Interface @@ -74,18 +104,20 @@ public class TopicImpl extends DestinationImpl implements Topic { return _destinationName; } + /** - * Check that this topic exchange - * - * @throws QpidException If this queue does not exists on the broker. - */ - protected void checkTopicExists() throws QpidException - { - // test if this exchange exist on the broker - _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); - // wait for the broker response - _session.getQpidSession().sync(); - // todo get the exception - } + * Check that this exchange exists + * + * @param session The session used to create this Topic. + * @throws QpidException If this exchange does not exists on the broker. + */ + private void checkTopicExists(SessionImpl session) throws QpidException + { + // test if this exchange exist on the broker + session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); + // wait for the broker response + session.getQpidSession().sync(); + // todo get the exception + } } diff --git a/java/client/src/main/java/org/apache/qpidity/naming/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpidity/naming/PropertiesFileInitialContextFactory.java new file mode 100644 index 0000000000..7536a7ec4a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/naming/PropertiesFileInitialContextFactory.java @@ -0,0 +1,264 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.naming; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.jms.ConnectionFactoryImpl; +import org.apache.qpidity.jms.DestinationImpl; +import org.apache.qpidity.jms.QueueImpl; +import org.apache.qpidity.jms.TopicImpl; +import org.apache.qpidity.url.BindingURLImpl; +import org.apache.qpidity.url.URLSyntaxException; +import org.apache.qpidity.url.BindingURL; +import org.apache.qpidity.QpidException; + +import javax.naming.spi.InitialContextFactory; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import java.util.Hashtable; +import java.util.Map; +import java.util.Properties; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; + +public class PropertiesFileInitialContextFactory implements InitialContextFactory +{ + protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); + + private String CONNECTION_FACTORY_PREFIX = "connectionfactory."; + private String DESTINATION_PREFIX = "destination."; + private String QUEUE_PREFIX = "queue."; + private String TOPIC_PREFIX = "topic."; + + public Context getInitialContext(Hashtable environment) throws NamingException + { + Map data = new ConcurrentHashMap(); + try + { + String file; + if (environment.containsKey(Context.PROVIDER_URL)) + { + file = (String) environment.get(Context.PROVIDER_URL); + } + else + { + file = System.getProperty(Context.PROVIDER_URL); + } + if (file != null) + { + _logger.info("Loading Properties from:" + file); + // Load the properties specified + Properties p = new Properties(); + p.load(new BufferedInputStream(new FileInputStream(file))); + environment.putAll(p); + _logger.info("Loaded Context Properties:" + environment.toString()); + } + else + { + _logger.info("No Provider URL specified."); + } + } + catch (IOException ioe) + { + _logger.warn( + "Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL)); + } + createConnectionFactories(data, environment); + createDestinations(data, environment); + createQueues(data, environment); + createTopics(data, environment); + return createContext(data, environment); + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected ReadOnlyContext createContext(Map data, Hashtable environment) + { + return new ReadOnlyContext(environment, data); + } + + protected void createConnectionFactories(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(CONNECTION_FACTORY_PREFIX)) + { + String jndiName = key.substring(CONNECTION_FACTORY_PREFIX.length()); + ConnectionFactory cf = createFactory(entry.getValue().toString()); + if (cf != null) + { + data.put(jndiName, cf); + } + } + } + } + + protected void createDestinations(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(DESTINATION_PREFIX)) + { + String jndiName = key.substring(DESTINATION_PREFIX.length()); + Destination dest = createDestination(entry.getValue().toString()); + if (dest != null) + { + data.put(jndiName, dest); + } + } + } + } + + protected void createQueues(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(QUEUE_PREFIX)) + { + String jndiName = key.substring(QUEUE_PREFIX.length()); + Queue q = createQueue(entry.getValue().toString()); + if (q != null) + { + data.put(jndiName, q); + } + } + } + } + + protected void createTopics(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(TOPIC_PREFIX)) + { + String jndiName = key.substring(TOPIC_PREFIX.length()); + Topic t = createTopic(entry.getValue().toString()); + if (t != null) + { + data.put(jndiName, t); + } + } + } + } + + /** + * Factory method to create new Connection Factory instances + */ + protected ConnectionFactory createFactory(String url) + { + try + { + return new ConnectionFactoryImpl(url); + } + catch (URLSyntaxException urlse) + { + _logger.warn("Unable to createFactories:" + urlse); + } + return null; + } + + /** + * Factory method to create new Destination instances from an AMQP BindingURL + */ + protected Destination createDestination(String bindingURL) + { + BindingURL binding; + try + { + binding = new BindingURLImpl(bindingURL); + } + catch (URLSyntaxException urlse) + { + _logger.warn("Unable to destination:" + urlse); + return null; + } + try + { + return new DestinationImpl(binding); + } + catch (QpidException iaw) + { + _logger.warn("Binding: '" + binding + "' not supported"); + return null; + } + } + + /** + * Factory method to create new Queue instances + */ + protected Queue createQueue(Object value) + { + Queue result = null; + try + { + if (value instanceof String) + { + result = new QueueImpl((String) value); + } + else if (value instanceof BindingURL) + { + result = new QueueImpl((BindingURL) value); + } + } + catch (QpidException e) + { + _logger.warn("Binding: '" + value + "' not supported"); + } + return result; + } + + /** + * Factory method to create new Topic instances + */ + protected Topic createTopic(Object value) + { + Topic result = null; + try + { + if (value instanceof String) + { + return new TopicImpl((String) value); + } + else if (value instanceof BindingURL) + { + return new TopicImpl((BindingURL) value); + } + } + catch (QpidException e) + { + _logger.warn("Binding: '" + value + "' not supported"); + } + return result; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java b/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java new file mode 100644 index 0000000000..c73d6e4b35 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/naming/ReadOnlyContext.java @@ -0,0 +1,509 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.naming; + +import org.apache.qpid.jndi.NameParserImpl; + +import javax.naming.*; +import javax.naming.spi.NamingManager; +import java.io.Serializable; +import java.util.*; + +/** + * Based on class from ActiveMQ. + * A read-only Context + * <p/> + * This version assumes it and all its subcontext are read-only and any attempt + * to modify (e.g. through bind) will result in an OperationNotSupportedException. + * Each Context in the tree builds a cache of the entries in all sub-contexts + * to optimise the performance of lookup. + * </p> + * <p>This implementation is intended to optimise the performance of lookup(String) + * to about the level of a HashMap get. It has been observed that the scheme + * resolution phase performed by the JVM takes considerably longer, so for + * optimum performance lookups should be coded like:</p> + * <code> + * Context componentContext = (Context)new InitialContext().lookup("java:comp"); + * String envEntry = (String) componentContext.lookup("env/myEntry"); + * String envEntry2 = (String) componentContext.lookup("env/myEntry2"); + * </code> + */ +public class ReadOnlyContext implements Context, Serializable +{ + private static final long serialVersionUID = -5754338187296859149L; + protected static final NameParser nameParser = new NameParserImpl(); + + protected final Hashtable environment; // environment for this context + protected final Map bindings; // bindings at my level + protected final Map treeBindings; // all bindings under me + + private boolean frozen = false; + private String nameInNamespace = ""; + public static final String SEPARATOR = "/"; + + public ReadOnlyContext() + { + environment = new Hashtable(); + bindings = new HashMap(); + treeBindings = new HashMap(); + } + + public ReadOnlyContext(Hashtable env) + { + if (env == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(env); + } + + this.bindings = Collections.EMPTY_MAP; + this.treeBindings = Collections.EMPTY_MAP; + } + + public ReadOnlyContext(Hashtable environment, Map bindings) + { + if (environment == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(environment); + } + + this.bindings = bindings; + treeBindings = new HashMap(); + frozen = true; + } + + public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace) + { + this(environment, bindings); + this.nameInNamespace = nameInNamespace; + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env) + { + this.bindings = clone.bindings; + this.treeBindings = clone.treeBindings; + this.environment = new Hashtable(env); + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace) + { + this(clone, env); + this.nameInNamespace = nameInNamespace; + } + + public void freeze() + { + frozen = true; + } + + boolean isFrozen() + { + return frozen; + } + + /** + * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses. + * It binds every possible lookup into a map in each context. To do this, each context + * strips off one name segment and if necessary creates a new context for it. Then it asks that context + * to bind the remaining name. It returns a map containing all the bindings from the next context, plus + * the context it just created (if it in fact created it). (the names are suitably extended by the segment + * originally lopped off). + * + * @param name + * @param value + * @return + * @throws javax.naming.NamingException + */ + protected Map internalBind(String name, Object value) throws NamingException + { + assert (name != null) && (name.length() > 0); + assert !frozen; + + Map newBindings = new HashMap(); + int pos = name.indexOf('/'); + if (pos == -1) + { + if (treeBindings.put(name, value) != null) + { + throw new NamingException("Something already bound at " + name); + } + + bindings.put(name, value); + newBindings.put(name, value); + } + else + { + String segment = name.substring(0, pos); + assert segment != null; + assert !segment.equals(""); + Object o = treeBindings.get(segment); + if (o == null) + { + o = newContext(); + treeBindings.put(segment, o); + bindings.put(segment, o); + newBindings.put(segment, o); + } + else if (!(o instanceof ReadOnlyContext)) + { + throw new NamingException("Something already bound where a subcontext should go"); + } + + ReadOnlyContext readOnlyContext = (ReadOnlyContext) o; + String remainder = name.substring(pos + 1); + Map subBindings = readOnlyContext.internalBind(remainder, value); + for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();) + { + Map.Entry entry = (Map.Entry) iterator.next(); + String subName = segment + "/" + (String) entry.getKey(); + Object bound = entry.getValue(); + treeBindings.put(subName, bound); + newBindings.put(subName, bound); + } + } + + return newBindings; + } + + protected ReadOnlyContext newContext() + { + return new ReadOnlyContext(); + } + + public Object addToEnvironment(String propName, Object propVal) throws NamingException + { + return environment.put(propName, propVal); + } + + public Hashtable getEnvironment() throws NamingException + { + return (Hashtable) environment.clone(); + } + + public Object removeFromEnvironment(String propName) throws NamingException + { + return environment.remove(propName); + } + + public Object lookup(String name) throws NamingException + { + if (name.length() == 0) + { + return this; + } + + Object result = treeBindings.get(name); + if (result == null) + { + result = bindings.get(name); + } + + if (result == null) + { + int pos = name.indexOf(':'); + if (pos > 0) + { + String scheme = name.substring(0, pos); + Context ctx = NamingManager.getURLContext(scheme, environment); + if (ctx == null) + { + throw new NamingException("scheme " + scheme + " not recognized"); + } + + return ctx.lookup(name); + } + else + { + // Split out the first name of the path + // and look for it in the bindings map. + CompositeName path = new CompositeName(name); + + if (path.size() == 0) + { + return this; + } + else + { + String first = path.get(0); + Object obj = bindings.get(first); + if (obj == null) + { + throw new NameNotFoundException(name); + } + else if ((obj instanceof Context) && (path.size() > 1)) + { + Context subContext = (Context) obj; + obj = subContext.lookup(path.getSuffix(1)); + } + + return obj; + } + } + } + + if (result instanceof LinkRef) + { + LinkRef ref = (LinkRef) result; + result = lookup(ref.getLinkName()); + } + + if (result instanceof Reference) + { + try + { + result = NamingManager.getObjectInstance(result, null, null, this.environment); + } + catch (NamingException e) + { + throw e; + } + catch (Exception e) + { + throw (NamingException) new NamingException("could not look up : " + name).initCause(e); + } + } + + if (result instanceof ReadOnlyContext) + { + String prefix = getNameInNamespace(); + if (prefix.length() > 0) + { + prefix = prefix + SEPARATOR; + } + + result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name); + } + + return result; + } + + public Object lookup(Name name) throws NamingException + { + return lookup(name.toString()); + } + + public Object lookupLink(String name) throws NamingException + { + return lookup(name); + } + + public Name composeName(Name name, Name prefix) throws NamingException + { + Name result = (Name) prefix.clone(); + result.addAll(name); + + return result; + } + + public String composeName(String name, String prefix) throws NamingException + { + CompositeName result = new CompositeName(prefix); + result.addAll(new CompositeName(name)); + + return result.toString(); + } + + public NamingEnumeration list(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ReadOnlyContext.ListEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).list(""); + } + else + { + throw new NotContextException(); + } + } + + public NamingEnumeration listBindings(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ReadOnlyContext.ListBindingEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).listBindings(""); + } + else + { + throw new NotContextException(); + } + } + + public Object lookupLink(Name name) throws NamingException + { + return lookupLink(name.toString()); + } + + public NamingEnumeration list(Name name) throws NamingException + { + return list(name.toString()); + } + + public NamingEnumeration listBindings(Name name) throws NamingException + { + return listBindings(name.toString()); + } + + public void bind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void bind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void close() throws NamingException + { + // ignore + } + + public Context createSubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public Context createSubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public String getNameInNamespace() throws NamingException + { + return nameInNamespace; + } + + public NameParser getNameParser(Name name) throws NamingException + { + return nameParser; + } + + public NameParser getNameParser(String name) throws NamingException + { + return nameParser; + } + + public void rebind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rebind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(Name oldName, Name newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(String oldName, String newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + private abstract class LocalNamingEnumeration implements NamingEnumeration + { + private Iterator i = bindings.entrySet().iterator(); + + public boolean hasMore() throws NamingException + { + return i.hasNext(); + } + + public boolean hasMoreElements() + { + return i.hasNext(); + } + + protected Map.Entry getNext() + { + return (Map.Entry) i.next(); + } + + public void close() throws NamingException + { } + } + + private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + + return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName()); + } + } + + private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + + return new Binding((String) entry.getKey(), entry.getValue()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties b/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties new file mode 100644 index 0000000000..c379ab7cfd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpidity.naming.PropertiesFileInitialConextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue |