diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 147 |
1 files changed, 104 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ebaa22ce44..2030876952 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,37 +20,6 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -59,6 +28,8 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -73,6 +44,25 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -157,12 +147,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; - + /** * Configuration info for SSL */ private SSLConfiguration _sslConfiguration; + private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + /** * @param broker brokerdetails * @param username username @@ -180,7 +175,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); } - + /** * @param broker brokerdetails * @param username username @@ -198,20 +193,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - + public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(host, port, false, username, password, clientName, virtualHost, sslConfig); + this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException @@ -234,12 +229,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(connection), null); } - + public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection), sslConfig); } - + public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { @@ -257,6 +252,28 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); + + if (connectionURL.getDefaultQueueExchangeName() != null) + { + _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); + } + + if (connectionURL.getDefaultTopicExchangeName() != null) + { + _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); + } + + if (connectionURL.getTemporaryQueueExchangeName() != null) + { + _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); + } + + if (connectionURL.getTemporaryTopicExchangeName() != null) + { + _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); + } + + _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -440,7 +457,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } @@ -1070,9 +1087,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } - + public SSLConfiguration getSSLConfiguration() { - return _sslConfiguration; + return _sslConfiguration; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + + public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) + { + _defaultTopicExchangeName = defaultTopicExchangeName; + } + + + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + + public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) + { + _defaultQueueExchangeName = defaultQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + } + + + public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) + { + _temporaryTopicExchangeName = temporaryTopicExchangeName; + } + + public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) + { + _temporaryQueueExchangeName = temporaryQueueExchangeName; } } |