diff options
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | java/java/client/src/org/apache/qpid/client/AMQConnection.java | 974 |
1 files changed, 0 insertions, 974 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/AMQConnection.java b/java/java/client/src/org/apache/qpid/client/AMQConnection.java deleted file mode 100644 index 98db26d0c4..0000000000 --- a/java/java/client/src/org/apache/qpid/client/AMQConnection.java +++ /dev/null @@ -1,974 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverSupport; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ChannelLimitReachedException; -import org.apache.qpid.jms.Connection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.FailoverPolicy; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable -{ - private static final Logger _logger = Logger.getLogger(AMQConnection.class); - - private AtomicInteger _idFactory = new AtomicInteger(0); - - /** - * This is the "root" mutex that must be held when doing anything that could be impacted by failover. - * This must be held by any child objects of this connection such as the session, producers and consumers. - */ - private final Object _failoverMutex = new Object(); - - /** - * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels - * per session and we must prevent the client from opening too many. Zero means unlimited. - */ - private long _maximumChannelCount; - - /** - * The maximum size of frame supported by the server - */ - private long _maximumFrameSize; - - /** - * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped - * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate - * handler. - */ - private AMQProtocolHandler _protocolHandler; - - /** - * Maps from session id (Integer) to AMQSession instance - */ - private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap - - private String _clientName; - - /** - * The user name to use for authentication - */ - private String _username; - - /** - * The password to use for authentication - */ - private String _password; - - /** - * The virtual path to connect to on the AMQ server - */ - private String _virtualHost; - - private ExceptionListener _exceptionListener; - - private ConnectionListener _connectionListener; - - private ConnectionURL _connectionURL; - - /** - * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for - * message publication. - */ - private boolean _started; - - /** - * Policy dictating how to failover - */ - private FailoverPolicy _failoverPolicy; - - /* - * _Connected should be refactored with a suitable wait object. - */ - private boolean _connected; - - /* - * The last error code that occured on the connection. Used to return the correct exception to the client - */ - private AMQException _lastAMQException = null; - - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='" + broker + "'")); - } - - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException - { - this(host, port, false, username, password, clientName, virtualHost); - } - - public AMQConnection(String host, int port, boolean useSSL, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(useSSL ? - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='true'" : - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='false'" - )); - } - - public AMQConnection(String connection) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(connection)); - } - - public AMQConnection(ConnectionURL connectionURL) throws AMQException - { - _logger.info("Connection:" + connectionURL); - - if (connectionURL == null) - { - throw new IllegalArgumentException("Connection must be specified"); - } - - _connectionURL = connectionURL; - - _clientName = connectionURL.getClientName(); - _username = connectionURL.getUsername(); - _password = connectionURL.getPassword(); - _virtualHost = connectionURL.getVirtualHost(); - - _failoverPolicy = new FailoverPolicy(connectionURL); - - _protocolHandler = new AMQProtocolHandler(this); - - // We are not currently connected - _connected = false; - - - Exception lastException = new Exception(); - lastException.initCause(new ConnectException()); - - while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed()) - { - try - { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - lastException = null; - } - catch (Exception e) - { - lastException = e; - - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); - } - } - - _logger.debug("Are we connected:" + _connected); - - // Then the Failover Thread will handle conneciton - if (_failoverPolicy.failoverAllowed()) - { - //TODO this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !_closed.get()) - { - try - { - _logger.debug("Sleeping."); - Thread.sleep(100); - } - catch (InterruptedException ie) - { - _logger.debug("Woken up."); - } - } - if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } - } - } - else - { - String message = null; - - if (lastException != null) - { - if (lastException.getCause() != null) - { - message = lastException.getCause().getMessage(); - } - else - { - message = lastException.getMessage(); - } - } - - if (message == null || message.equals("")) - { - message = "Unable to Connect"; - } - - AMQException e = new AMQConnectionException(message); - - if (lastException != null) - { - if (lastException instanceof UnresolvedAddressException) - { - e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString()); - } - e.initCause(lastException); - } - - throw e; - } - } - - protected boolean checkException(Throwable thrown) - { - Throwable cause = thrown.getCause(); - - if (cause == null) - { - cause = thrown; - } - - return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); - } - - protected AMQConnection(String username, String password, String clientName, String virtualHost) - { - _clientName = clientName; - _username = username; - _password = password; - _virtualHost = virtualHost; - } - - private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException - { - try - { - TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up - _protocolHandler.attainState(AMQState.CONNECTION_OPEN); - _failoverPolicy.attainedConnection(); - - //Again this should be changed to a suitable notify - _connected = true; - } - catch (AMQException e) - { - _lastAMQException = e; - throw e; - } - } - - public boolean attemptReconnection(String host, int port, boolean useSSL) - { - BrokerDetails bd = new AMQBrokerDetails(host, port, useSSL); - - _failoverPolicy.setBroker(bd); - - try - { - makeBrokerConnection(bd); - return true; - } - catch (Exception e) - { - _logger.info("Unable to connect to broker at " + bd); - attemptReconnection(); - } - return false; - } - - public boolean attemptReconnection() - { - while (_failoverPolicy.failoverAllowed()) - { - try - { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - return true; - } - catch (Exception e) - { - if (!(e instanceof AMQException)) - { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); - } - else - { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); - } - } - } - - //connection unsuccessful - return false; - } - - /** - * Get the details of the currently active broker - * - * @return null if no broker is active (i.e. no successful connection has been made, or - * the BrokerDetail instance otherwise - */ - public BrokerDetails getActiveBrokerDetails() - { - return _failoverPolicy.getCurrentBrokerDetails(); - } - - public boolean failoverAllowed() - { - return _failoverPolicy.failoverAllowed(); - } - - public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException - { - return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); - } - - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetch) throws JMSException - { - return createSession(transacted, acknowledgeMode, prefetch, prefetch); - } - - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException - { - checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } - else - { - return (org.apache.qpid.jms.Session) new FailoverSupport() - { - public Object operation() throws JMSException - { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, - prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - _protocolHandler.removeSessionByChannel(channelId); - deregisterSession(channelId); - } - } - - if (_started) - { - session.start(); - } - return session; - } - }.execute(this); - } - } - - private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException - { - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false), - BasicQosOkBody.class); - - if (transacted) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Issuing TxSelect for " + channelId); - } - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class); - } - } - - private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException - { - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - } - catch (AMQException e) - { - _protocolHandler.removeSessionByChannel(channelId); - deregisterSession(channelId); - throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); - } - } - - - public void setFailoverPolicy(FailoverPolicy policy) - { - _failoverPolicy = policy; - } - - public FailoverPolicy getFailoverPolicy() - { - return _failoverPolicy; - } - - /** - * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec - * @param transacted - * @param acknowledgeMode - * @return QueueSession - * @throws JMSException - */ - public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException - { - return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode)); - } - - /** - * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec - * @param transacted - * @param acknowledgeMode - * @return TopicSession - * @throws JMSException - */ - public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException - { - return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); - } - - private boolean channelLimitReached() - { - return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount; - } - - public String getClientID() throws JMSException - { - checkNotClosed(); - return _clientName; - } - - public void setClientID(String clientID) throws JMSException - { - checkNotClosed(); - _clientName = clientID; - } - - public ConnectionMetaData getMetaData() throws JMSException - { - checkNotClosed(); - // TODO Auto-generated method stub - return null; - } - - public ExceptionListener getExceptionListener() throws JMSException - { - checkNotClosed(); - return _exceptionListener; - } - - public void setExceptionListener(ExceptionListener listener) throws JMSException - { - checkNotClosed(); - _exceptionListener = listener; - } - - /** - * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread - * and is not thread safe (which is legal according to the JMS specification). - * - * @throws JMSException - */ - public void start() throws JMSException - { - checkNotClosed(); - if (!_started) - { - final Iterator it = _sessions.entrySet().iterator(); - while (it.hasNext()) - { - final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); - s.start(); - } - _started = true; - } - } - - public void stop() throws JMSException - { - checkNotClosed(); - - if (_started) - { - for (Iterator i = _sessions.values().iterator(); i.hasNext();) - { - ((AMQSession) i.next()).stop(); - } - _started = false; - } - } - - public void close() throws JMSException - { - synchronized(getFailoverMutex()) - { - if (!_closed.getAndSet(true)) - { - try - { - closeAllSessions(null); - _protocolHandler.closeConnection(); - } - catch (AMQException e) - { - throw new JMSException("Error closing connection: " + e); - } - } - } - } - - /** - * Marks all sessions and their children as closed without sending any protocol messages. Useful when - * you need to mark objects "visible" in userland as closed after failover or other significant event that - * impacts the connection. - * <p/> - * The caller must hold the failover mutex before calling this method. - */ - private void markAllSessionsClosed() - { - final LinkedList sessionCopy = new LinkedList(_sessions.values()); - final Iterator it = sessionCopy.iterator(); - while (it.hasNext()) - { - final AMQSession session = (AMQSession) it.next(); - - session.markClosed(); - } - _sessions.clear(); - } - - /** - * Close all the sessions, either due to normal connection closure or due to an error occurring. - * - * @param cause if not null, the error that is causing this shutdown - * <p/> - * The caller must hold the failover mutex before calling this method. - */ - private void closeAllSessions(Throwable cause) throws JMSException - { - final LinkedList sessionCopy = new LinkedList(_sessions.values()); - final Iterator it = sessionCopy.iterator(); - JMSException sessionException = null; - while (it.hasNext()) - { - final AMQSession session = (AMQSession) it.next(); - if (cause != null) - { - session.closed(cause); - } - else - { - try - { - session.close(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e); - sessionException = e; - } - } - } - _sessions.clear(); - if (sessionException != null) - { - throw sessionException; - } - } - - public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException - { - checkNotClosed(); - return null; - } - - public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException - { - checkNotClosed(); - return null; - } - - public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException - { - checkNotClosed(); - return null; - } - - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, - int maxMessages) - throws JMSException - { - // TODO Auto-generated method stub - checkNotClosed(); - return null; - } - - public long getMaximumChannelCount() throws JMSException - { - checkNotClosed(); - return _maximumChannelCount; - } - - public void setConnectionListener(ConnectionListener listener) - { - _connectionListener = listener; - } - - public ConnectionListener getConnectionListener() - { - return _connectionListener; - } - - public void setMaximumChannelCount(long maximumChannelCount) - { - _maximumChannelCount = maximumChannelCount; - } - - public void setMaximumFrameSize(long frameMax) - { - _maximumFrameSize = frameMax; - } - - public long getMaximumFrameSize() - { - return _maximumFrameSize; - } - - public Map getSessions() - { - return _sessions; - } - - public String getUsername() - { - return _username; - } - - public String getPassword() - { - return _password; - } - - public String getVirtualHost() - { - return _virtualHost; - } - - public AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - public void bytesSent(long writtenBytes) - { - if (_connectionListener != null) - { - _connectionListener.bytesSent(writtenBytes); - } - } - - public void bytesReceived(long receivedBytes) - { - if (_connectionListener != null) - { - _connectionListener.bytesReceived(receivedBytes); - } - } - - /** - * Fire the preFailover event to the registered connection listener (if any) - * - * @param redirect true if this is the result of a redirect request rather than a connection error - * @return true if no listener or listener does not veto change - */ - public boolean firePreFailover(boolean redirect) - { - boolean proceed = true; - if (_connectionListener != null) - { - proceed = _connectionListener.preFailover(redirect); - } - return proceed; - } - - /** - * Fire the preResubscribe event to the registered connection listener (if any). If the listener - * vetoes resubscription then all the sessions are closed. - * - * @return true if no listener or listener does not veto resubscription. - * @throws JMSException - */ - public boolean firePreResubscribe() throws JMSException - { - if (_connectionListener != null) - { - boolean resubscribe = _connectionListener.preResubscribe(); - if (!resubscribe) - { - markAllSessionsClosed(); - } - return resubscribe; - } - else - { - return true; - } - } - - /** - * Fires a failover complete event to the registered connection listener (if any). - */ - public void fireFailoverComplete() - { - if (_connectionListener != null) - { - _connectionListener.failoverComplete(); - } - } - - /** - * In order to protect the consistency of the connection and its child sessions, consumers and producers, - * the "failover mutex" must be held when doing any operations that could be corrupted during failover. - * - * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. - */ - public final Object getFailoverMutex() - { - return _failoverMutex; - } - - /** - * If failover is taking place this will block until it has completed. If failover - * is not taking place it will return immediately. - * - * @throws InterruptedException - */ - public void blockUntilNotFailingOver() throws InterruptedException - { - _protocolHandler.blockUntilNotFailingOver(); - } - - /** - * Invoked by the AMQProtocolSession when a protocol session exception has occurred. - * This method sends the exception to a JMS exception listener, if configured, and - * propagates the exception to sessions, which in turn will propagate to consumers. - * This allows synchronous consumers to have exceptions thrown to them. - * - * @param cause the exception - */ - public void exceptionReceived(Throwable cause) - { - - _logger.debug("Connection Close done by:" + Thread.currentThread().getName()); - _logger.debug("exceptionReceived is ", cause); - - final JMSException je; - if (cause instanceof JMSException) - { - je = (JMSException) cause; - } - else - { - je = new JMSException("Exception thrown against " + toString() + ": " + cause); - if (cause instanceof Exception) - { - je.setLinkedException((Exception) cause); - } - } - - // in the case of an IOException, MINA has closed the protocol session so we set _closed to true - // so that any generic client code that tries to close the connection will not mess up this error - // handling sequence - if (cause instanceof IOException) - { - _closed.set(true); - } - - if (_exceptionListener != null) - { - _exceptionListener.onException(je); - } - - if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException)) - { - try - { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); - _closed.set(true); - closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor. - } - catch (JMSException e) - { - _logger.error("Error closing all sessions: " + e, e); - } - - } - else - { - _logger.info("Not a hard-error connection not closing."); - } - } - - void registerSession(int channelId, AMQSession session) - { - _sessions.put(channelId, session); - } - - void deregisterSession(int channelId) - { - _sessions.remove(channelId); - } - - /** - * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. - * The caller must hold the failover mutex before calling this method. - */ - public void resubscribeSessions() throws JMSException, AMQException - { - ArrayList sessions = new ArrayList(_sessions.values()); - _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove? - for (Iterator it = sessions.iterator(); it.hasNext();) - { - AMQSession s = (AMQSession) it.next(); - _protocolHandler.addSessionByChannel(s.getChannelId(), s); - reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); - s.resubscribe(); - } - } - - public String toString() - { - StringBuffer buf = new StringBuffer("AMQConnection:\n"); - if (_failoverPolicy.getCurrentBrokerDetails() == null) - { - buf.append("No active broker connection"); - } - else - { - BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); - buf.append("Host: ").append(String.valueOf(bd.getHost())); - buf.append("\nPort: ").append(String.valueOf(bd.getPort())); - } - buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); - buf.append("\nClient ID: ").append(String.valueOf(_clientName)); - buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size()); - return buf.toString(); - } - - public String toURL() - { - return _connectionURL.toString(); - } - - public Reference getReference() throws NamingException - { - return new Reference( - AMQConnection.class.getName(), - new StringRefAddr(AMQConnection.class.getName(), toURL()), - AMQConnectionFactory.class.getName(), - null); // factory location - } -} |