diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid')
48 files changed, 1028 insertions, 873 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 999b22299c..a201f7d61e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -72,17 +72,6 @@ public class AMQAnyDestination extends AMQDestination implements Queue, Topic public String getTopicName() throws JMSException { - if (getRoutingKey() != null) - { - return getRoutingKey().asString(); - } - else if (getSubject() != null) - { - return getSubject(); - } - else - { - return null; - } + return super.getRoutingKey().toString(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index c8576bf00d..ee52cd50af 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -56,7 +56,9 @@ public class AMQBrokerDetails implements BrokerDetails if (transport != null) { //todo this list of valid transports should be enumerated somewhere - if (!(transport.equalsIgnoreCase(BrokerDetails.TCP))) + if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) || + transport.equalsIgnoreCase(BrokerDetails.TCP) || + transport.equalsIgnoreCase(BrokerDetails.SOCKET)))) { if (transport.equalsIgnoreCase("localhost")) { @@ -103,21 +105,6 @@ public class AMQBrokerDetails implements BrokerDetails if (host == null) { host = ""; - - String auth = connection.getAuthority(); - if (auth != null) - { - // contains both host & port myhost:5672 - if (auth.contains(":")) - { - host = auth.substring(0,auth.indexOf(":")); - } - else - { - host = auth; - } - } - } setHost(host); @@ -180,7 +167,10 @@ public class AMQBrokerDetails implements BrokerDetails } else { - setPort(port); + if (!_transport.equalsIgnoreCase(SOCKET)) + { + setPort(port); + } } String queryString = connection.getQuery(); @@ -296,9 +286,17 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_transport); sb.append("://"); - sb.append(_host); - sb.append(':'); - sb.append(_port); + + if (!(_transport.equalsIgnoreCase(VM))) + { + sb.append(_host); + } + + if (!(_transport.equalsIgnoreCase(SOCKET))) + { + sb.append(':'); + sb.append(_port); + } sb.append(printOptionsURL()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4a62f443f1..ab59fee020 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -111,7 +111,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Maps from session id (Integer) to AMQSession instance */ private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); - private final String _clientName; + private String _clientName; /** The user name to use for authentication */ private String _username; @@ -126,7 +126,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ConnectionListener _connectionListener; - private final ConnectionURL _connectionURL; + private ConnectionURL _connectionURL; /** * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message @@ -173,8 +173,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates the sync publish options (persistent|all) //By default it's async publish private String _syncPublish = ""; - - // Indicates whether to use the old map message format or the + + // Indicates whether to use the old map message format or the // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; @@ -257,11 +257,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - if (connectionURL == null) - { - throw new IllegalArgumentException("Connection must be specified"); - } - // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { @@ -269,7 +264,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the default value set for all connections + // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, ClientProperties.MAX_PREFETCH_DEFAULT)); } @@ -283,7 +278,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the default value set for all connections + // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); if (_syncPersistence) { @@ -298,7 +293,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the default value set for all connections + // use the defaul value set for all connections _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); } @@ -311,7 +306,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); } - + if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) { _useLegacyMapMessageFormat = Boolean.parseBoolean( @@ -322,16 +317,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } - + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); _logger.debug("AMQP version " + amqpVersion); - + _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if ("0-8".equals(amqpVersion)) + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); - } + } else if ("0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_9(this); @@ -351,6 +346,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } _sslConfiguration = sslConfig; + if (connectionURL == null) + { + throw new IllegalArgumentException("Connection must be specified"); + } + _connectionURL = connectionURL; _clientName = connectionURL.getClientName(); @@ -418,7 +418,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } - verifyClientID(); if (_logger.isDebugEnabled()) { @@ -505,7 +504,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - //Update our session to use this new protocol version + //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); } @@ -536,6 +535,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + protected AMQConnection(String username, String password, String clientName, String virtualHost) + { + _clientName = clientName; + _username = username; + _password = password; + setVirtualHost(virtualHost); + } + private void setVirtualHost(String virtualHost) { if (virtualHost != null && virtualHost.startsWith("/")) @@ -689,6 +696,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException + { + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + } + catch (AMQException e) + { + deregisterSession(channelId); + throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); + } + } + public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; @@ -1075,7 +1096,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _username = id; } - + public String getPassword() { return _password; @@ -1251,7 +1272,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { je.setLinkedException((Exception) cause); } - + je.initCause(cause); } @@ -1284,7 +1305,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Not a hard-error connection not closing: " + cause); } - + // deliver the exception if there is a listener if (_exceptionListener != null) { @@ -1294,7 +1315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.error("Throwable Received but no listener set: " + cause); } - + // if we are closing the connection, close sessions first if (closer) { @@ -1351,20 +1372,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return buf.toString(); } - /** - * Returns connection url. - * @return connection url - */ - public ConnectionURL getConnectionURL() - { - return _connectionURL; - } - - /** - * Returns stringified connection url. This url is suitable only for display - * as {@link AMQConnectionURL#toString()} converts any password to asterisks. - * @return connection url - */ public String toURL() { return _connectionURL.toString(); @@ -1435,18 +1442,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate.getProtocolVersion(); } - - public String getBrokerUUID() - { - if(getProtocolVersion().equals(ProtocolVersion.v0_10)) - { - return ((AMQConnectionDelegate_0_10)_delegate).getUUID(); - } - else - { - return null; - } - } + public boolean isFailingOver() { return (_protocolHandler.getFailoverLatch() != null); @@ -1489,24 +1485,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.getNextChannelId(); } - + public boolean isUseLegacyMapMessageFormat() { return _useLegacyMapMessageFormat; } - - private void verifyClientID() throws AMQException - { - if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) - { - try - { - _delegate.verifyClientID(); - } - catch(JMSException e) - { - throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e); - } - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5acdaaa185..9560bd5c7c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -57,12 +57,10 @@ public interface AMQConnectionDelegate void closeConnection(long timeout) throws JMSException, AMQException; <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - + int getMaxChannelID(); int getMinChannelID(); ProtocolVersion getProtocolVersion(); - - void verifyClientID() throws JMSException; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index cb531d4fca..b0bd8f8e97 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -47,7 +47,6 @@ import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.ProtocolVersionException; -import org.apache.qpid.transport.SessionDetachCode; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +57,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec * This class logger. */ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class); - - /** - * The name of the UUID property - */ - private static final String UUID_NAME = "qpid.federation_tag"; + /** * The AMQ Connection. */ @@ -74,12 +69,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; - static - { - // Register any configured SASL client factories. - org.apache.qpid.client.security.DynamicSaslRegistrar.registerSaslProviders(); - } - //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { @@ -91,14 +80,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec /** * create a Session and start it if required. */ - public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) - throws JMSException - { - return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null); - } - - public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name) throws JMSException { _conn.checkNotClosed(); @@ -113,7 +95,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec try { session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow,name); + prefetchLow); _conn.registerSession(channelId, session); if (_conn._started) { @@ -229,8 +211,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { - _logger.info("Resuming connection"); - getQpidConnection().resume(); List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) @@ -347,11 +327,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return ProtocolVersion.v0_10; } - public String getUUID() - { - return (String)_qpidConnection.getServerProperties().get(UUID_NAME); - } - private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail) { @@ -466,31 +441,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec else { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); - } + } return heartbeat; } - + protected org.apache.qpid.transport.Connection getQpidConnection() { return _qpidConnection; } - - public void verifyClientID() throws JMSException - { - int prefetch = (int)_conn.getMaxPrefetch(); - AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID()); - org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession(); - try - { - ssn_0_10.awaitOpen(); - } - catch(Exception e) - { - if (ssn_0_10.getDetachCode() != null && - ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY) - { - throw new JMSException("ClientID must be unique"); - } - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 0cd1d49224..40b332d216 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -48,11 +49,6 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,21 +89,15 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - ConnectionSettings settings = new ConnectionSettings(); - settings.setHost(brokerDetail.getHost()); - settings.setPort(brokerDetail.getPort()); - settings.setProtocol(brokerDetail.getTransport()); - - SSLConfiguration sslConfig = _conn.getSSLConfiguration(); - SSLContextFactory sslFactory = null; - if (sslConfig != null) + // TODO: use system property thingy for this + if (System.getProperty("UseTransportIo", "false").equals("false")) { - sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + } + else + { + _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } - - OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory); - _conn._protocolHandler.setNetworkConnection(network); _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -332,9 +322,4 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return ProtocolVersion.v8_0; } - - public void verifyClientID() throws JMSException - { - // NOOP - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index f9f50d9150..93b4c51a8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -27,14 +27,18 @@ import java.util.Map; import org.apache.qpid.client.url.URLParser; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQConnectionURL implements ConnectionURL { - + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class); + private String _url; private String _failoverMethod; private Map<String, String> _failoverOptions; @@ -291,4 +295,17 @@ public class AMQConnectionURL implements ConnectionURL return sb.toString(); } + + public static void main(String[] args) throws URLSyntaxException + { + String url2 = + "amqp://ritchiem:bob@temp/testHost?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; + // "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + System.out.println(connectionurl2); + + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 3ef32fb008..eb9682a3cf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -21,6 +21,8 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import javax.jms.Destination; @@ -32,6 +34,8 @@ import javax.naming.StringRefAddr; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.client.messaging.address.QpidExchangeOptions; +import org.apache.qpid.client.messaging.address.QpidQueueOptions; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -74,6 +78,11 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _exchangeExistsChecked; + private byte[] _byteEncoding; + private static final int IS_DURABLE_MASK = 0x1; + private static final int IS_EXCLUSIVE_MASK = 0x2; + private static final int IS_AUTODELETE_MASK = 0x4; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -314,11 +323,7 @@ public abstract class AMQDestination implements Destination, Referenceable { if(_urlAsShortString == null) { - if (_url == null) - { - toURL(); - } - _urlAsShortString = new AMQShortString(_url); + toURL(); } return _urlAsShortString; } @@ -365,6 +370,7 @@ public abstract class AMQDestination implements Destination, Referenceable // calculated URL now out of date _url = null; _urlAsShortString = null; + _byteEncoding = null; } public AMQShortString getRoutingKey() @@ -502,10 +508,59 @@ public abstract class AMQDestination implements Destination, Referenceable sb.deleteCharAt(sb.length() - 1); url = sb.toString(); _url = url; + _urlAsShortString = new AMQShortString(url); } return url; } + public byte[] toByteEncoding() + { + byte[] encoding = _byteEncoding; + if(encoding == null) + { + int size = _exchangeClass.length() + 1 + + _exchangeName.length() + 1 + + 0 + // in place of the destination name + (_queueName == null ? 0 : _queueName.length()) + 1 + + 1; + encoding = new byte[size]; + int pos = 0; + + pos = _exchangeClass.writeToByteArray(encoding, pos); + pos = _exchangeName.writeToByteArray(encoding, pos); + + encoding[pos++] = (byte)0; + + if(_queueName == null) + { + encoding[pos++] = (byte)0; + } + else + { + pos = _queueName.writeToByteArray(encoding,pos); + } + byte options = 0; + if(_isDurable) + { + options |= IS_DURABLE_MASK; + } + if(_isExclusive) + { + options |= IS_EXCLUSIVE_MASK; + } + if(_isAutoDelete) + { + options |= IS_AUTODELETE_MASK; + } + encoding[pos] = options; + + + _byteEncoding = encoding; + + } + return encoding; + } + public boolean equals(Object o) { if (this == o) @@ -559,6 +614,53 @@ public abstract class AMQDestination implements Destination, Referenceable null); // factory location } + + public static Destination createDestination(byte[] byteEncodedDestination) + { + AMQShortString exchangeClass; + AMQShortString exchangeName; + AMQShortString routingKey; + AMQShortString queueName; + boolean isDurable; + boolean isExclusive; + boolean isAutoDelete; + + int pos = 0; + exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= exchangeClass.length() + 1; + exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= exchangeName.length() + 1; + routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (routingKey == null ? 0 : routingKey.length()) + 1; + queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos); + pos+= (queueName == null ? 0 : queueName.length()) + 1; + int options = byteEncodedDestination[pos]; + isDurable = (options & IS_DURABLE_MASK) != 0; + isExclusive = (options & IS_EXCLUSIVE_MASK) != 0; + isAutoDelete = (options & IS_AUTODELETE_MASK) != 0; + + if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable); + } + else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable); + } + else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + { + return new AMQHeadersExchange(routingKey); + } + else + { + return new AMQAnyDestination(exchangeName,exchangeClass, + routingKey,isExclusive, + isAutoDelete,queueName, + isDurable, new AMQShortString[0]); + } + + } + public static Destination createDestination(BindingURL binding) { AMQShortString type = binding.getExchangeClass(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25562cfff7..1f940b62f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -567,8 +567,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic close(-1); } - public abstract AMQException getLastException(); - public void checkNotClosed() throws JMSException { try @@ -577,20 +575,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (IllegalStateException ise) { - AMQException ex = getLastException(); - if (ex != null) - { - IllegalStateException ssnClosed = new IllegalStateException( - "Session has been closed", ex.getErrorCode().toString()); + // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for + AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); - ssnClosed.setLinkedException(ex); - ssnClosed.initCause(ex); - throw ssnClosed; - } - else + if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) { - throw ise; + ise.setLinkedException(manager.getLastException()); + ise.initCause(ise.getLinkedException()); } + + throw ise; } } @@ -1049,29 +1043,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException { checkNotClosed(); - Topic origTopic = checkValidTopic(topic, true); - + AMQTopic origTopic = checkValidTopic(topic, true); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - if (dest.getDestSyntax() == DestSyntax.ADDR && - !dest.isAddressResolved()) - { - try - { - handleAddressBasedDestination(dest,false,true); - if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) - { - throw new JMSException("Durable subscribers can only be created for Topics"); - } - dest.getSourceNode().setDurable(true); - } - catch(AMQException e) - { - JMSException ex = new JMSException("Error when verifying destination"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } - } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1083,9 +1056,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Not subscribed to this name in the current session if (subscriber == null) { - // After the address is resolved routing key will not be null. - AMQShortString topicName = dest.getRoutingKey(); - + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } else + { + topicName = new AMQShortString(topic.getTopicName()); + } + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1246,6 +1225,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { AMQQueue queue = new AMQQueue(queueName); + queue.setCreate(AddressOption.ALWAYS); return queue; } @@ -1327,8 +1307,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { checkValidDestination(destination); - Queue dest = validateQueue(destination); - C consumer = (C) createConsumer(dest); + AMQQueue dest = (AMQQueue) destination; + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1346,8 +1326,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - Queue dest = validateQueue(destination); - C consumer = (C) createConsumer(dest, messageSelector); + AMQQueue dest = (AMQQueue) destination; + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1364,7 +1344,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue) throws JMSException { checkNotClosed(); - Queue dest = validateQueue(queue); + AMQQueue dest = (AMQQueue) queue; C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); @@ -1383,23 +1363,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); - Queue dest = validateQueue(queue); + AMQQueue dest = (AMQQueue) queue; C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } - - private Queue validateQueue(Destination dest) throws InvalidDestinationException - { - if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) - { - return (Queue)dest; - } - else - { - throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue"); - } - } public QueueSender createSender(Queue queue) throws JMSException { @@ -1440,7 +1408,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); @@ -1460,7 +1428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - Topic dest = checkValidTopic(topic); + AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); @@ -2427,7 +2395,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /* * I could have combined the last 3 methods, but this way it improves readability */ - protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException + protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException { if (topic == null) { @@ -2446,17 +2414,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ("Cannot create a durable subscription with a temporary topic: " + topic); } - if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic)) + if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException( "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } - return topic; + return (AMQTopic) topic; } - protected Topic checkValidTopic(Topic topic) throws JMSException + protected AMQTopic checkValidTopic(Topic topic) throws JMSException { return checkValidTopic(topic, false); } @@ -2851,7 +2819,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); } - bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); } AMQShortString queueName = amqd.getAMQQueueName(); @@ -2859,6 +2826,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // store the consumer queue name consumer.setQueuename(queueName); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) { @@ -3481,9 +3450,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return _closing.get()|| _connection.isClosing(); } - - public boolean isDeclareExchanges() - { - return DECLARE_EXCHANGES; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75d96d67af..517a7a5ce8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -47,8 +47,6 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -58,7 +56,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; @@ -159,20 +156,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, - int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name) + int defaultPrefetchHighMark, int defaultPrefetchLowMark) { super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - if (name == null) - { - _qpidSession = _qpidConnection.createSession(1); - } - else - { - _qpidSession = _qpidConnection.createSession(name,1); - } + _qpidSession = _qpidConnection.createSession(1); _qpidSession.setSessionListener(this); if (_transacted) { @@ -199,12 +189,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param qpidConnection The connection */ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, - boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow, - String name) + boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), - defaultPrefetchHigh, defaultPrefetchLow,name); + defaultPrefetchHigh, defaultPrefetchLow); } private void addUnacked(int id) @@ -325,7 +314,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) - throws AMQException + throws AMQException, FailoverException { if (destination.getDestSyntax() == DestSyntax.BURL) { @@ -611,16 +600,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } @@ -784,7 +767,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else { QueueNode node = (QueueNode)amqd.getSourceNode(); - getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , + getQpidSession().queueDeclare(queueName.toString(), "" , node.getDeclareArgs(), node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, @@ -921,26 +904,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic setCurrentException(exc); } - public void closed(Session ssn) - { - try - { - super.closed(null); - if (flushTask != null) - { - flushTask.cancel(); - flushTask = null; - } - } catch (Exception e) - { - _logger.error("Error closing JMS session", e); - } - } - - public AMQException getLastException() - { - return getCurrentException(); - } + public void closed(Session ssn) {} protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean noLocal, final boolean nowait) @@ -1056,9 +1020,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic code = ee.getErrorCode().getValue(); } AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + + _connection.exceptionReceived(amqe); + _currentException = amqe; } - _connection.exceptionReceived(_currentException); } public AMQMessageDelegateFactory getMessageDelegateFactory() @@ -1102,37 +1068,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) { boolean match = true; - try + QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); + match = dest.getAddressName().equals(result.getQueue()); + + if (match && assertNode) { - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - - if (match && assertNode) - { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (match) - { - // should I use the queried details to update the local data structure. - } + match = (result.getDurable() == node.isDurable()) && + (result.getAutoDelete() == node.isAutoDelete()) && + (result.getExclusive() == node.isExclusive()) && + (matchProps(result.getArguments(),node.getDeclareArgs())); } - catch(SessionException e) + else if (match) { - if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) - { - match = false; - } - else - { - throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), - "Error querying queue",e); - } + // should I use the queried details to update the local data structure. } return match; @@ -1198,22 +1149,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic int type = resolveAddressType(dest); - if (type == AMQDestination.QUEUE_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.UNRELIABLE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) - { - throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); - } - switch (type) { case AMQDestination.QUEUE_TYPE: @@ -1227,8 +1162,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { setLegacyFiledsForQueueType(dest); send0_10QueueDeclare(dest,null,false,noWait); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); break; } } @@ -1337,8 +1270,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.getQueueName(),// should have one by now dest.getSubject(), Collections.<String,Object>emptyMap())); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); } public void setLegacyFiledsForQueueType(AMQDestination dest) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index c010e4c7ed..f41b1c94fa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -38,7 +38,6 @@ import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; @@ -585,35 +584,4 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey)); } - - - public AMQException getLastException() - { - // if the Connection has closed then we should throw any exception that - // has occurred that we were not waiting for - AMQStateManager manager = _connection.getProtocolHandler() - .getStateManager(); - - Exception e = manager.getLastException(); - if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) - && e != null) - { - if (e instanceof AMQException) - { - return (AMQException) e; - } - else - { - AMQException amqe = new AMQException(AMQConstant - .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), - e.getMessage(), e.getCause()); - return amqe; - } - } - else - { - return null; - } - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 780dbcafc2..6217cb534a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import java.net.URISyntaxException; -import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Topic; @@ -96,47 +95,39 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); } - public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection) + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic) + if (topic.getDestSyntax() == DestSyntax.ADDR) { - AMQDestination qpidTopic = (AMQDestination)topic; - if (qpidTopic.getDestSyntax() == DestSyntax.ADDR) + try { - try - { - AMQTopic t = new AMQTopic(qpidTopic.getAddress()); - AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); - // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); - - // The legacy fields are also populated just in case. - t.setQueueName(queueName); - t.setAutoDelete(false); - t.setDurable(true); - return t; - } - catch(Exception e) - { - JMSException ex = new JMSException("Error creating durable topic"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; - } + AMQTopic t = new AMQTopic(topic.getAddress()); + AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); + // link is never null if dest was created using an address string. + t.getLink().setName(queueName.asString()); + t.getSourceNode().setAutoDelete(false); + t.getSourceNode().setDurable(true); + + // The legacy fields are also populated just in case. + t.setQueueName(queueName); + t.setAutoDelete(false); + t.setDurable(true); + return t; } - else + catch(Exception e) { - return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, - getDurableTopicQueueName(subscriptionName, connection), - true); + JMSException ex = new JMSException("Error creating durable topic"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; } } else { - throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic"); + return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false, + getDurableTopicQueueName(subscriptionName, connection), + true); } } @@ -147,17 +138,13 @@ public class AMQTopic extends AMQDestination implements Topic public String getTopicName() throws JMSException { - if (getRoutingKey() != null) + if (super.getRoutingKey() == null && super.getSubject() != null) { - return getRoutingKey().asString(); - } - else if (getSubject() != null) - { - return getSubject(); + return super.getSubject(); } else { - return null; + return super.getRoutingKey().toString(); } } @@ -176,18 +163,12 @@ public class AMQTopic extends AMQDestination implements Topic public AMQShortString getRoutingKey() { - if (super.getRoutingKey() != null) - { - return super.getRoutingKey(); - } - else if (getSubject() != null) + if (super.getRoutingKey() == null && super.getSubject() != null) { - return new AMQShortString(getSubject()); + return new AMQShortString(super.getSubject()); } else { - setRoutingKey(new AMQShortString("")); - setSubject(""); return super.getRoutingKey(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 5d32863f2f..0a78403268 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -571,7 +571,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (!_session.isClosed() || _session.isClosing()) { sendCancel(); - cleanupQueue(); } } catch (AMQException e) @@ -609,8 +608,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; - - abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 964c238946..b5f3501e5a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,11 +19,8 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -512,18 +509,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _exclusive; } } - - void cleanupQueue() throws AMQException, FailoverException - { - AMQDestination dest = this.getDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.RECEIVER ) - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); - } - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 00acd5e866..cdbf57769d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -88,8 +88,4 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } - void cleanupQueue() throws AMQException, FailoverException - { - - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5821fee7ff..53c0457120 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -19,7 +19,6 @@ package org.apache.qpid.client; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; import java.nio.ByteBuffer; import java.util.HashMap; @@ -31,13 +30,9 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -73,15 +68,12 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { if (destination.getDestSyntax() == DestSyntax.BURL) { - if (getSession().isDeclareExchanges()) - { - String name = destination.getExchangeName().toString(); - ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare - (name, - destination.getExchangeClass().toString(), - null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); - } + String name = destination.getExchangeName().toString(); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare + (name, + destination.getExchangeClass().toString(), + null, null, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); } else { @@ -179,7 +171,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && (destination.getSubject() != null || - (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) + (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null)) ) { Map<String,Object> appProps = messageProps.getApplicationHeaders(); @@ -189,16 +181,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setApplicationHeaders(appProps); } - if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) + if (appProps.get("qpid.subject") == null) { // use default subject in address string - appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject()); + appProps.put("qpid.subject",destination.getSubject()); } - if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE) { deliveryProp.setRoutingKey((String) - messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); + messageProps.getApplicationHeaders().get("qpid.subject")); } } @@ -218,9 +210,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryMode == DeliveryMode.PERSISTENT) ); - boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && - (destination.getLink().getReliability() == Reliability.UNRELIABLE); - org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); @@ -228,7 +217,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProp, messageProps), - buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE); + buffer, sync ? SYNC : NONE); if (sync) { ssn.sync(); @@ -250,21 +239,5 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { return _session.isQueueBound(destination); } - - @Override - public void close() - { - super.close(); - AMQDestination dest = _destination; - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); - } - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java index 2fdb35de49..2b7e3d44da 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -1,23 +1,3 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client; import java.util.ArrayList; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index e81e754da2..7cc548915c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -23,7 +23,6 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; -import java.util.List; import org.apache.qpid.framing.AMQShortString; @@ -35,18 +34,6 @@ public enum CustomJMSXProperty JMSXGroupSeq, JMSXUserID; - private static List<String> _names; - - static - { - CustomJMSXProperty[] properties = values(); - _names = new ArrayList<String>(properties.length); - for(CustomJMSXProperty property : properties) - { - _names.add(property.toString()); - } - - } private final AMQShortString _nameAsShortString; @@ -60,8 +47,20 @@ public enum CustomJMSXProperty return _nameAsShortString; } - public static Enumeration asEnumeration() + private static Enumeration _names; + + public static synchronized Enumeration asEnumeration() { - return Collections.enumeration(_names); + if(_names == null) + { + CustomJMSXProperty[] properties = values(); + ArrayList<String> nameList = new ArrayList<String>(properties.length); + for(CustomJMSXProperty property : properties) + { + nameList.add(property.toString()); + } + _names = Collections.enumeration(nameList); + } + return _names; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 5cf767ac35..3bb5707417 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -30,11 +30,9 @@ import org.apache.qpid.common.QpidProperties; public class QpidConnectionMetaData implements ConnectionMetaData { - private AMQConnection con; QpidConnectionMetaData(AMQConnection conn) { - this.con = conn; } public int getJMSMajorVersion() throws JMSException @@ -64,12 +62,12 @@ public class QpidConnectionMetaData implements ConnectionMetaData public int getProviderMajorVersion() throws JMSException { - return con.getProtocolVersion().getMajorVersion(); + return 0; } public int getProviderMinorVersion() throws JMSException { - return con.getProtocolVersion().getMinorVersion(); + return 8; } public String getProviderVersion() throws JMSException @@ -80,7 +78,8 @@ public class QpidConnectionMetaData implements ConnectionMetaData private String getProtocolVersion() { - return con.getProtocolVersion().toString(); + // TODO - Implement based on connection negotiated protocol + return "0.8"; } public String getBrokerVersion() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 295c6a4091..27783bcacf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -50,25 +50,25 @@ public class QueueSenderAdapter implements QueueSender public void send(Message msg) throws JMSException { - checkQueuePreConditions(_queue); + checkPreConditions(); _delegate.send(msg); } public void send(Queue queue, Message msg) throws JMSException { - checkQueuePreConditions(queue); + checkPreConditions(queue); _delegate.send(queue, msg); } public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkQueuePreConditions(_queue); + checkPreConditions(); _delegate.send(msg, deliveryMode, priority, timeToLive); } public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkQueuePreConditions(queue); + checkPreConditions(queue); _delegate.send(queue, msg, deliveryMode, priority, timeToLive); } @@ -122,19 +122,19 @@ public class QueueSenderAdapter implements QueueSender public void send(Destination dest, Message msg) throws JMSException { - checkQueuePreConditions((Queue) dest); + checkPreConditions((Queue) dest); _delegate.send(dest, msg); } public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkQueuePreConditions(_queue); + checkPreConditions(); _delegate.send(msg, deliveryMode, priority, timeToLive); } public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkQueuePreConditions((Queue) dest); + checkPreConditions((Queue) dest); _delegate.send(dest, msg, deliveryMode, priority, timeToLive); } @@ -170,6 +170,11 @@ public class QueueSenderAdapter implements QueueSender private void checkPreConditions() throws JMSException { + checkPreConditions(_queue); + } + + private void checkPreConditions(Queue queue) throws JMSException + { if (closed) { throw new javax.jms.IllegalStateException("Publisher is closed"); @@ -181,43 +186,39 @@ public class QueueSenderAdapter implements QueueSender { throw new javax.jms.IllegalStateException("Invalid Session"); } - } - private void checkQueuePreConditions(Queue queue) throws JMSException - { - checkPreConditions() ; - - if (queue == null) - { - throw new UnsupportedOperationException("Queue is null."); - } - - if (!(queue instanceof AMQDestination)) - { - throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); - } - - AMQDestination destination = (AMQDestination) queue; - if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish()) - { - if (_delegate.getSession().isStrictAMQP()) - { - _delegate._logger.warn("AMQP does not support destination validation before publish, "); - destination.setCheckedForQueueBinding(true); - } - else - { - if (_delegate.isBound(destination)) - { - destination.setCheckedForQueueBinding(true); - } - else - { - throw new InvalidDestinationException("Queue: " + queue - + " is not a valid destination (no bindings on server"); - } - } - } + if (queue == null) + { + throw new UnsupportedOperationException("Queue is null."); + } + + if (!(queue instanceof AMQDestination)) + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); + } + + AMQDestination destination = (AMQDestination) queue; + if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish()) + { + + if (_delegate.getSession().isStrictAMQP()) + { + _delegate._logger.warn("AMQP does not support destination validation before publish, "); + destination.setCheckedForQueueBinding(true); + } + else + { + if (_delegate.isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + + " is not a valid destination (no bindings on server"); + } + } + } } private boolean checkQueueBeforePublish() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 5b94b342eb..8a75082202 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -21,14 +21,10 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.dtx.XidImpl; -import org.apache.qpid.transport.DtxXaStatus; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.Future; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.RecoverResult; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.XaResult; +import org.apache.qpid.transport.*; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,28 +211,9 @@ public class XAResourceImpl implements XAResource * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. */ public boolean isSameRM(XAResource xaResource) throws XAException - { - if(this == xaResource) - { - return true; - } - if(!(xaResource instanceof XAResourceImpl)) - { - return false; - } - - XAResourceImpl other = (XAResourceImpl)xaResource; - - String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); - String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID(); - - if(_logger.isDebugEnabled()) - { - _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); - } - - return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); - + { + // TODO : get the server identity of xaResource and compare it with our own one + return false; } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 6b9121811d..354b67cd35 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { super(qpidConnection, con, channelId, false, // this is not a transacted session Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted - MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null); + MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); createSession(); _xaResource = new XAResourceImpl(this); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 2b49bb8f81..c81ad6422f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -226,7 +226,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co { Object instance = mechanismClass.newInstance(); AMQCallbackHandler cbh = (AMQCallbackHandler) instance; - cbh.initialise(protocolSession.getAMQConnection().getConnectionURL()); + cbh.initialise(protocolSession); return cbh; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 182b7b65d8..92e61984d2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -22,12 +22,10 @@ package org.apache.qpid.client.message; import java.lang.ref.SoftReference; -import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -634,16 +632,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { return new String(_messageProps.getUserId()); } - else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) && - _messageProps.getAppId() != null) - { - return new String(_messageProps.getAppId()); - } - else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) && - _deliveryProps.getRoutingKey() != null) - { - return _deliveryProps.getRoutingKey(); - } else { checkPropertyName(propertyName); @@ -682,19 +670,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate public Enumeration getPropertyNames() throws JMSException { - List<String> props = new ArrayList<String>(); - Map<String, Object> propertyMap = getApplicationHeaders(); - for (String prop: getApplicationHeaders().keySet()) - { - Object value = propertyMap.get(prop); - if (value instanceof Boolean || value instanceof Number - || value instanceof String) - { - props.add(prop); - } - } - - return java.util.Collections.enumeration(props); + return java.util.Collections.enumeration(getApplicationHeaders().keySet()); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException @@ -750,14 +726,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { checkPropertyName(propertyName); checkWritableProperties(); - if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName)) - { - _messageProps.setAppId(value.getBytes()); - } - else - { - setApplicationHeader(propertyName, value); - } + setApplicationHeader(propertyName, value); } private static final Set<Class> ALLOWED = new HashSet(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 7f735e0722..6e22292ee0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -23,7 +23,6 @@ package org.apache.qpid.client.message; import java.util.List; import java.util.Map; -import java.util.UUID; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -66,7 +65,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) - || (value instanceof List) || (value instanceof Map) || (value instanceof UUID) || (value == null)) + || (value instanceof List) || (value instanceof Map) || (value == null)) { _map.put(propName, value); } @@ -110,7 +109,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage } // for testing - public Map<String,Object> getMap() + Map<String,Object> getMap() { return _map; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 40c1df0c5d..e719c9a4b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -99,7 +99,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, - (BasicContentHeaderProperties) contentHeader.getProperties(), + (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey); return createMessage(delegate, data); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index cdb75fc9a9..4e4061cf4d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -104,7 +104,7 @@ public class MessageFactoryRegistry AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) throws AMQException, JMSException { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties(); + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java deleted file mode 100644 index b30afafa35..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -/** - * Place holder for Qpid specific message properties - */ -public class QpidMessageProperties -{ - - public static final String QPID_SUBJECT = "qpid.subject"; - - // AMQP 0-10 related properties - public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id"; - public static final String AMQP_0_10_ROUTING_KEY = "x-amqp-0-10.routing-key"; -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 368ec60525..00503cc650 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,7 +27,6 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; @@ -55,7 +54,7 @@ public class AddressHelper public static final String EXCLUSIVE = "exclusive"; public static final String AUTO_DELETE = "auto-delete"; public static final String TYPE = "type"; - public static final String ALT_EXCHANGE = "alternate-exchange"; + public static final String ALT_EXCHANGE = "alt-exchange"; public static final String BINDINGS = "bindings"; public static final String BROWSE = "browse"; public static final String MODE = "mode"; @@ -232,9 +231,14 @@ public class AddressHelper private boolean getDurability(Map map) { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + if (map != null && map.get(DURABLE) != null) + { + return Boolean.parseBoolean((String)map.get(DURABLE)); + } + else + { + return false; + } } /** @@ -258,7 +262,7 @@ public class AddressHelper } } - public Link getLink() throws Exception + public Link getLink() { Link link = new Link(); link.setSubscription(new Subscription()); @@ -268,25 +272,6 @@ public class AddressHelper : linkProps.getBoolean(DURABLE)); link.setName(linkProps.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); - if ( reliability != null) - { - if (reliability.equalsIgnoreCase("unreliable")) - { - link.setReliability(Reliability.UNRELIABLE); - } - else if (reliability.equalsIgnoreCase("at-least-once")) - { - link.setReliability(Reliability.AT_LEAST_ONCE); - } - else - { - throw new Exception("The reliability mode '" + - reliability + "' is not yet supported"); - } - - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 5f97d625b4..a7d19d1bd5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.messaging.address; -import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; - import java.util.HashMap; import java.util.Map; @@ -31,8 +29,6 @@ public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } - public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } - protected String name; protected String _filter; protected FilterType _filterType = FilterType.SUBJECT; @@ -42,18 +38,7 @@ public class Link protected int _producerCapacity = 0; protected Node node; protected Subscription subscription; - protected Reliability reliability = UNSPECIFIED; - public Reliability getReliability() - { - return reliability; - } - - public void setReliability(Reliability reliability) - { - this.reliability = reliability; - } - public Node getNode() { return node; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 34c6468629..eb5af119b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -57,6 +57,7 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.Job; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; @@ -64,9 +65,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,13 +172,11 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; - private NetworkTransport _transport; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -213,6 +211,21 @@ public class AMQProtocolHandler implements ProtocolEngine } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), + brokerDetail.getPort(), + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -302,7 +315,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _network.close(); + _networkDriver.close(); } public void writerIdle() @@ -324,7 +337,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _network.close(); + _networkDriver.close(); closed(); } else @@ -576,7 +589,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _sender.send(buf); + _networkDriver.send(buf); } }); if (PROTOCOL_DEBUG) @@ -597,7 +610,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _sender.flush(); + _networkDriver.flush(); } } @@ -711,7 +724,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _network.close(); + _networkDriver.close(); closed(); } catch (AMQTimeoutException e) @@ -831,18 +844,17 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } - public void setNetworkConnection(NetworkConnection network) + public void setNetworkDriver(NetworkDriver driver) { - _network = network; - _sender = network.getSender(); + _networkDriver = driver; } /** @param delay delay in seconds (not ms) */ @@ -850,15 +862,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkConnection getNetworkConnection() + public NetworkDriver getNetworkDriver() { - return _network; + return _networkDriver; } public ProtocolVersion getSuggestedProtocolVersion() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 5b7d272506..7976760696 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -148,6 +148,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return getAMQConnection().getVirtualHost(); } + public String getUsername() + { + return getAMQConnection().getUsername(); + } + + public String getPassword() + { + return getAMQConnection().getPassword(); + } + public SaslClient getSaslClient() { return _saslClient; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java index 67dd1a58b6..fbca444208 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java @@ -22,9 +22,9 @@ package org.apache.qpid.client.security; import javax.security.auth.callback.CallbackHandler; -import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.client.protocol.AMQProtocolSession; public interface AMQCallbackHandler extends CallbackHandler { - void initialise(ConnectionURL connectionURL); + void initialise(AMQProtocolSession protocolSession); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties index b903208927..1bff43142b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties @@ -18,4 +18,3 @@ # AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory -ANONYMOUS=org.apache.qpid.client.security.anonymous.AnonymousSaslClientFactory diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java index 6ec83f0a23..66176dac3c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java @@ -20,29 +20,30 @@ */ package org.apache.qpid.client.security; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import org.apache.qpid.client.protocol.AMQProtocolSession; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.security.auth.callback.Callback; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; -import org.apache.qpid.jms.ConnectionURL; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler { - private ConnectionURL _connectionURL; + private static final Logger _logger = LoggerFactory.getLogger(UsernameHashedPasswordCallbackHandler.class); - /** - * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL) - */ - @Override - public void initialise(ConnectionURL connectionURL) + private AMQProtocolSession _protocolSession; + + public void initialise(AMQProtocolSession protocolSession) { - _connectionURL = connectionURL; + _protocolSession = protocolSession; } public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException @@ -52,13 +53,13 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler Callback cb = callbacks[i]; if (cb instanceof NameCallback) { - ((NameCallback) cb).setName(_connectionURL.getUsername()); + ((NameCallback) cb).setName(_protocolSession.getUsername()); } else if (cb instanceof PasswordCallback) { try { - ((PasswordCallback) cb).setPassword(getHash(_connectionURL.getPassword())); + ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword())); } catch (NoSuchAlgorithmException e) { @@ -98,5 +99,4 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler return hash; } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java index ad088722c8..c50c62710f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java @@ -27,19 +27,15 @@ import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; -import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.client.protocol.AMQProtocolSession; public class UsernamePasswordCallbackHandler implements AMQCallbackHandler { - private ConnectionURL _connectionURL; + private AMQProtocolSession _protocolSession; - /** - * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL) - */ - @Override - public void initialise(final ConnectionURL connectionURL) + public void initialise(AMQProtocolSession protocolSession) { - _connectionURL = connectionURL; + _protocolSession = protocolSession; } public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException @@ -49,11 +45,11 @@ public class UsernamePasswordCallbackHandler implements AMQCallbackHandler Callback cb = callbacks[i]; if (cb instanceof NameCallback) { - ((NameCallback)cb).setName(_connectionURL.getUsername()); + ((NameCallback)cb).setName(_protocolSession.getUsername()); } else if (cb instanceof PasswordCallback) { - ((PasswordCallback)cb).setPassword(_connectionURL.getPassword().toCharArray()); + ((PasswordCallback)cb).setPassword(_protocolSession.getPassword().toCharArray()); } else { @@ -61,5 +57,4 @@ public class UsernamePasswordCallbackHandler implements AMQCallbackHandler } } } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java deleted file mode 100644 index 0f56b2ef6c..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClient.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.security.anonymous; - -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -public class AnonymousSaslClient implements SaslClient -{ - public String getMechanismName() { - return "ANONYMOUS"; - } - public boolean hasInitialResponse() { - return true; - } - public byte[] evaluateChallenge(byte[] challenge) throws SaslException { - return new byte[0]; - } - public boolean isComplete() { - return true; - } - public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException - { - throw new IllegalStateException("No security layer supported"); - } - public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException - { - throw new IllegalStateException("No security layer supported"); - } - public Object getNegotiatedProperty(String propName) { - return null; - } - public void dispose() throws SaslException {} -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java deleted file mode 100644 index de698f87c6..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/anonymous/AnonymousSaslClientFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.security.anonymous; - -import java.util.Arrays; -import java.util.Map; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslClientFactory; -import javax.security.sasl.SaslException; -import javax.security.auth.callback.CallbackHandler; - -public class AnonymousSaslClientFactory implements SaslClientFactory -{ - public SaslClient createSaslClient(String[] mechanisms, String authId, - String protocol, String server, - Map props, CallbackHandler cbh) throws SaslException - { - if (Arrays.asList(mechanisms).contains("ANONYMOUS")) { - return new AnonymousSaslClient(); - } else { - return null; - } - } - public String[] getMechanismNames(Map props) - { - if (props == null || props.isEmpty()) { - return new String[]{"ANONYMOUS"}; - } else { - return new String[0]; - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java new file mode 100644 index 0000000000..1ac8f62e32 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SocketTransportConnection implements ITransportConnection +{ + private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + private SocketConnectorFactory _socketConnectorFactory; + + static interface SocketConnectorFactory + { + IoConnector newSocketConnector(); + } + + public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory) + { + _socketConnectorFactory = socketConnectorFactory; + } + + public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException + { + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (!Boolean.getBoolean("amqj.enablePooledAllocator")) + { + _logger.info("Using SimpleByteBufferAllocator"); + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); + final InetSocketAddress address; + + if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) + { + address = null; + } + else + { + address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); + _logger.info("Attempting connection to " + address); + } + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) + { + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java new file mode 100644 index 0000000000..aef3a563af --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -0,0 +1,351 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.io.IOException; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.vmpipe.VmPipeAcceptor; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying + * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA + * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete + * class/interface combo. + */ +public class TransportConnection +{ + private static ITransportConnection _instance; + + private static final Map _inVmPipeAddress = new HashMap(); + private static VmPipeAcceptor _acceptor; + private static int _currentInstance = -1; + private static int _currentVMPort = -1; + + private static final int TCP = 0; + private static final int VM = 1; + private static final int SOCKET = 2; + + private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); + + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; + + private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); + + public static void registerOpenSocket(String socketID, Socket openSocket) + { + _openSocketRegister.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return _openSocketRegister.remove(socketID); + } + + public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException + { + int transport = getTransport(details.getTransport()); + + if (transport == -1) + { + throw new AMQNoTransportForProtocolException(details, null, null); + } + + switch (transport) + { + case SOCKET: + return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); + + Socket socket = TransportConnection.removeOpenSocket(details.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) connector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport:" + details); + } + return connector; + } + }); + case TCP: + return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) + { + _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); + result = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); + } + else + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector + } + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + return result; + } + }); + case VM: + { + return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + } + default: + throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null); + } + } + + private static int getTransport(String transport) + { + if (transport.equals(BrokerDetails.SOCKET)) + { + return SOCKET; + } + + if (transport.equals(BrokerDetails.TCP)) + { + return TCP; + } + + if (transport.equals(BrokerDetails.VM)) + { + return VM; + } + + return -1; + } + + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) + throws AMQVMBrokerCreationException + { + int port = details.getPort(); + + synchronized (_inVmPipeAddress) + { + if (!_inVmPipeAddress.containsKey(port)) + { + if (AutoCreate) + { + _logger.warn("Auto Creating InVM Broker on port:" + port); + createVMBroker(port); + } + else + { + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); + } + } + } + + return new VmPipeTransportConnection(port); + } + + public static void createVMBroker(int port) throws AMQVMBrokerCreationException + { + synchronized(TransportConnection.class) + { + if (_acceptor == null) + { + _acceptor = new VmPipeAcceptor(); + + IoServiceConfig config = _acceptor.getDefaultConfig(); + } + } + synchronized (_inVmPipeAddress) + { + + if (!_inVmPipeAddress.containsKey(port)) + { + _logger.info("Creating InVM Qpid.AMQP listening on port " + port); + IoHandlerAdapter provider = null; + try + { + VmPipeAddress pipe = new VmPipeAddress(port); + + provider = createBrokerInstance(port); + + _acceptor.bind(pipe, provider); + + _inVmPipeAddress.put(port, pipe); + _logger.info("Created InVM Qpid.AMQP listening on port " + port); + } + catch (IOException e) + { + _logger.error("Got IOException.", e); + + // Try and unbind provider + try + { + VmPipeAddress pipe = new VmPipeAddress(port); + + try + { + _acceptor.unbind(pipe); + } + catch (Exception ignore) + { + // ignore + } + + if (provider == null) + { + provider = createBrokerInstance(port); + } + + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); + _logger.info("Created InVM Qpid.AMQP listening on port " + port); + } + catch (IOException justUseFirstException) + { + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + } + } + + } + else + { + _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); + } + } + } + + private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException + { + String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER); + _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); + + // can't use introspection to get Provider as it is a server class. + // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. + + // get right constructor and pass in instancec ID - "port" + IoHandlerAdapter provider; + try + { + Class[] cnstr = {Integer.class}; + Object[] params = {port}; + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); + // Give the broker a second to create + _logger.info("Created VMBroker Instance:" + port); + } + catch (Exception e) + { + _logger.info("Unable to create InVM Qpid.AMQP on port " + port + ". Because: " + e.getCause()); + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + + AMQVMBrokerCreationException amqbce = + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); + throw amqbce; + } + + return provider; + } + + public static void killAllVMBrokers() + { + _logger.info("Killing all VM Brokers"); + synchronized(TransportConnection.class) + { + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + synchronized (_inVmPipeAddress) + { + _inVmPipeAddress.clear(); + } + _acceptor = null; + } + _currentInstance = -1; + _currentVMPort = -1; + } + + public static void killVMBroker(int port) + { + synchronized (_inVmPipeAddress) + { + VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + if (pipe != null) + { + _logger.info("Killing VM Broker:" + port); + _inVmPipeAddress.remove(port); + // This does need to be sychronized as otherwise mina can hang + // if a new connection is made + _acceptor.unbind(pipe); + } + } + } + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java new file mode 100644 index 0000000000..87cc2e7a5a --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.io.IOException; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VmPipeTransportConnection implements ITransportConnection +{ + private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); + + private int _port; + + private MINANetworkDriver _networkDriver; + + public VmPipeTransportConnection(int port) + { + _port = port; + } + + public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException + { + final VmPipeConnector ioConnector = new QpidVmPipeConnector(); + + final VmPipeAddress address = new VmPipeAddress(_port); + _logger.info("Attempting connection to " + address); + _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); + protocolHandler.setNetworkDriver(_networkDriver); + ConnectFuture future = ioConnector.connect(address, _networkDriver); + // wait for connection to complete + future.join(); + // we call getSession which throws an IOException if there has been an error connecting + future.getSession(); + _networkDriver.setProtocolEngine(protocolHandler); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index 03167561ef..f3f74dd332 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -45,7 +45,7 @@ public class URLParser private void parseURL(String fullURL) throws URLSyntaxException { // Connection URL format - // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';tcp://host:port?option=\'value\'',failover='method?option=\'value\',option='value''" + // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''" // Options are of course optional except for requiring a single broker in the broker list. try { @@ -195,7 +195,7 @@ public class URLParser { String brokerlist = _url.getOptions().get(AMQConnectionURL.OPTIONS_BROKERLIST); - // brokerlist tcp://host:port?option='value',option='value';tcp://host:port/virtualpath?option='value' + // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); while (st.hasMoreTokens()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java new file mode 100644 index 0000000000..dc0d9b8c78 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.vmbroker; + +import org.apache.qpid.client.transport.AMQTransportConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +/** + * AMQVMBrokerCreationException represents failure to create an in VM broker on the vm transport medium. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to create an in VM broker. + * </table> + * + * @todo Error code never used. This is not an AMQException. + */ +public class AMQVMBrokerCreationException extends AMQTransportConnectionException +{ + private int _port; + + /** + * @param port + * + * @deprecated + */ + public AMQVMBrokerCreationException(int port) + { + this(null, port, "Unable to create vm broker", null); + } + + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) + { + super(errorCode, message, cause); + _port = port; + } + + public String toString() + { + return super.toString() + " on port " + _port; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java index 574a1b3888..b7b6bd57bc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java @@ -19,7 +19,6 @@ package org.apache.qpid.filter; import java.util.HashMap; -import javax.jms.DeliveryMode; import javax.jms.JMSException; import org.apache.qpid.AMQInternalException; @@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory; public class PropertyExpression implements Expression { // Constants - defined the same as JMS - private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT } + private static final int NON_PERSISTENT = 1; private static final int DEFAULT_PRIORITY = 4; private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); @@ -80,24 +79,22 @@ public class PropertyExpression implements Expression { public Object evaluate(AbstractJMSMessage message) { - - JMSDeliveryMode mode = JMSDeliveryMode.NON_PERSISTENT; try { - mode = message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? - JMSDeliveryMode.PERSISTENT : JMSDeliveryMode.NON_PERSISTENT; - + int mode = message.getJMSDeliveryMode(); if (_logger.isDebugEnabled()) { _logger.debug("JMSDeliveryMode is :" + mode); } + + return mode; } catch (JMSException e) { _logger.warn("Error evaluating property",e); } - return mode.toString(); + return NON_PERSISTENT; } }); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 4db6a11e4d..6d81f728c9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -52,7 +52,9 @@ public interface BrokerDetails public static final int DEFAULT_PORT = 5672; + public static final String SOCKET = "socket"; public static final String TCP = "tcp"; + public static final String VM = "vm"; public static final String DEFAULT_TRANSPORT = TCP; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 26641982d7..0e8ca60686 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -27,7 +27,7 @@ import java.util.List; /** Connection URL format - amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';tcp://host:port/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" + amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" Options are of course optional except for requiring a single broker in the broker list. The option seperator is defined to be either '&' or ',' */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 56abf03c81..7cdcd32306 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.failover.FailoverExchangeMethod; import org.apache.qpid.jms.failover.FailoverMethod; import org.apache.qpid.jms.failover.FailoverRoundRobinServers; @@ -50,7 +51,7 @@ public class FailoverPolicy private long _lastMethodTime; private long _lastFailTime; - public FailoverPolicy(ConnectionURL connectionDetails, Connection conn) + public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn) { FailoverMethod method; @@ -82,7 +83,7 @@ public class FailoverPolicy */ if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER)) { - method = new FailoverSingleServer(connectionDetails); + method = new FailoverRoundRobinServers(connectionDetails); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index ef30f2adbc..9e6000c472 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -32,9 +32,9 @@ import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +58,7 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); /** This is not safe to use until attainConnection is called */ - private Connection _conn; + private AMQConnection _conn; /** Protects the broker list when modifications happens */ private Object _brokerListLock = new Object(); @@ -80,7 +80,7 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener /** Denotes the number of failed attempts **/ private int _failedAttemps = 0; - public FailoverExchangeMethod(ConnectionURL connectionDetails, Connection conn) + public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn) { _connectionDetails = connectionDetails; _originalBrokerDetail = _connectionDetails.getBrokerDetails(0); |