From 4eaa4e42093e5524d9552d8fa312c214524b6bb4 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 10 Mar 2012 19:22:10 +0000 Subject: NO-JIRA : AMQP-1-0 sandbox updates - merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1299257 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/client/src/main/java/client.bnd | 2 +- .../org/apache/qpid/client/AMQAnyDestination.java | 9 +- .../org/apache/qpid/client/AMQBrokerDetails.java | 50 +- .../java/org/apache/qpid/client/AMQConnection.java | 153 ++-- .../apache/qpid/client/AMQConnectionDelegate.java | 9 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 42 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 83 +- .../apache/qpid/client/AMQConnectionFactory.java | 112 ++- .../org/apache/qpid/client/AMQConnectionURL.java | 18 +- .../org/apache/qpid/client/AMQDestination.java | 132 ++- .../main/java/org/apache/qpid/client/AMQQueue.java | 14 +- .../org/apache/qpid/client/AMQQueueBrowser.java | 74 +- .../apache/qpid/client/AMQQueueSessionAdaptor.java | 167 +--- .../java/org/apache/qpid/client/AMQSession.java | 796 ++++++++--------- .../org/apache/qpid/client/AMQSessionAdapter.java | 169 +++- .../org/apache/qpid/client/AMQSession_0_10.java | 166 ++-- .../org/apache/qpid/client/AMQSession_0_8.java | 181 ++-- .../org/apache/qpid/client/AMQTemporaryQueue.java | 5 +- .../main/java/org/apache/qpid/client/AMQTopic.java | 13 +- .../apache/qpid/client/AMQTopicSessionAdaptor.java | 170 +--- .../apache/qpid/client/BasicMessageConsumer.java | 137 +-- .../qpid/client/BasicMessageConsumer_0_10.java | 74 +- .../qpid/client/BasicMessageConsumer_0_8.java | 35 +- .../apache/qpid/client/BasicMessageProducer.java | 179 +++- .../qpid/client/BasicMessageProducer_0_10.java | 50 +- .../qpid/client/BasicMessageProducer_0_8.java | 60 +- .../java/org/apache/qpid/client/Closeable.java | 14 +- .../org/apache/qpid/client/CustomJMSXProperty.java | 4 +- .../org/apache/qpid/client/DispatcherCallback.java | 36 - .../apache/qpid/client/MessageConsumerPair.java | 4 +- .../apache/qpid/client/QpidConnectionMetaData.java | 5 +- .../apache/qpid/client/QueueReceiverAdaptor.java | 4 +- .../org/apache/qpid/client/QueueSenderAdapter.java | 2 +- .../apache/qpid/client/TemporaryDestination.java | 4 +- .../org/apache/qpid/client/XAConnectionImpl.java | 14 +- .../org/apache/qpid/client/XAResourceImpl.java | 36 +- .../java/org/apache/qpid/client/XASessionImpl.java | 27 +- .../qpid/client/failover/FailoverHandler.java | 15 +- .../qpid/client/failover/FailoverNoopSupport.java | 4 +- .../qpid/client/failover/FailoverRetrySupport.java | 8 +- .../qpid/client/filter/JMSSelectorFilter.java | 4 + .../handler/AccessRequestOkMethodHandler.java | 6 +- .../client/handler/BasicCancelOkMethodHandler.java | 6 +- .../client/handler/BasicDeliverMethodHandler.java | 5 +- .../client/handler/BasicReturnMethodHandler.java | 6 +- .../client/handler/ChannelCloseMethodHandler.java | 5 +- .../handler/ChannelCloseOkMethodHandler.java | 5 +- .../client/handler/ChannelFlowMethodHandler.java | 9 +- .../client/handler/ChannelFlowOkMethodHandler.java | 6 +- .../client/handler/ClientMethodDispatcherImpl.java | 21 +- .../handler/ClientMethodDispatcherImpl_0_9.java | 8 +- .../handler/ClientMethodDispatcherImpl_0_91.java | 8 +- .../handler/ClientMethodDispatcherImpl_8_0.java | 14 +- .../handler/ConnectionCloseMethodHandler.java | 10 +- .../handler/ConnectionOpenOkMethodHandler.java | 3 +- .../handler/ConnectionRedirectMethodHandler.java | 6 +- .../handler/ConnectionSecureMethodHandler.java | 6 +- .../handler/ConnectionStartMethodHandler.java | 53 +- .../handler/ConnectionTuneMethodHandler.java | 13 +- .../handler/ExchangeBoundOkMethodHandler.java | 6 +- .../client/handler/QueueDeleteOkMethodHandler.java | 6 +- .../qpid/client/message/AMQMessageDelegate.java | 1 - .../client/message/AMQMessageDelegate_0_10.java | 100 ++- .../client/message/AMQMessageDelegate_0_8.java | 48 +- .../qpid/client/message/AMQPEncodedMapMessage.java | 31 +- .../message/AMQPEncodedMapMessageFactory.java | 5 +- .../client/message/AbstractAMQMessageDelegate.java | 23 +- .../client/message/AbstractBytesTypedMessage.java | 19 +- .../qpid/client/message/AbstractJMSMessage.java | 13 +- .../client/message/AbstractJMSMessageFactory.java | 24 +- .../qpid/client/message/FieldTableSupport.java | 52 +- .../qpid/client/message/JMSBytesMessage.java | 14 +- .../client/message/JMSBytesMessageFactory.java | 5 +- .../qpid/client/message/JMSHeaderAdapter.java | 27 +- .../apache/qpid/client/message/JMSMapMessage.java | 18 +- .../qpid/client/message/JMSMapMessageFactory.java | 4 +- .../qpid/client/message/JMSObjectMessage.java | 14 +- .../client/message/JMSObjectMessageFactory.java | 4 +- .../qpid/client/message/JMSStreamMessage.java | 9 +- .../client/message/JMSStreamMessageFactory.java | 5 +- .../apache/qpid/client/message/JMSTextMessage.java | 17 +- .../qpid/client/message/JMSTextMessageFactory.java | 6 +- .../qpid/client/message/MessageConverter.java | 13 +- .../apache/qpid/client/message/MessageFactory.java | 7 +- .../client/message/MessageFactoryRegistry.java | 20 +- .../qpid/client/message/QpidMessageProperties.java | 3 + .../client/message/TypedBytesContentReader.java | 2 +- .../qpid/client/message/UnprocessedMessage.java | 1 - .../client/message/UnprocessedMessage_0_8.java | 18 +- .../client/messaging/address/AddressHelper.java | 11 +- .../apache/qpid/client/messaging/address/Link.java | 20 +- .../apache/qpid/client/messaging/address/Node.java | 43 +- .../qpid/client/protocol/AMQProtocolHandler.java | 184 +--- .../qpid/client/protocol/AMQProtocolSession.java | 102 ++- .../protocol/BlockingMethodFrameListener.java | 7 +- .../qpid/client/protocol/HeartbeatDiagnostics.java | 4 + .../qpid/client/security/AMQCallbackHandler.java | 4 +- .../client/security/CallbackHandlerRegistry.java | 9 +- .../qpid/client/security/DynamicSaslRegistrar.java | 50 +- .../apache/qpid/client/security/JCAProvider.java | 3 - .../UsernameHashedPasswordCallbackHandler.java | 11 +- .../security/UsernamePasswordCallbackHandler.java | 5 +- .../security/amqplain/AmqPlainSaslClient.java | 6 +- .../amqplain/AmqPlainSaslClientFactory.java | 3 +- .../anonymous/AnonymousSaslClientFactory.java | 8 +- .../CRAMMD5HashedSaslClientFactory.java | 9 +- .../state/AMQMethodNotImplementedException.java | 2 +- .../apache/qpid/client/state/AMQStateManager.java | 12 +- .../client/state/StateAwareMethodListener.java | 3 +- .../org/apache/qpid/client/state/StateWaiter.java | 9 +- .../AMQNoTransportForProtocolException.java | 59 -- .../transport/AMQTransportConnectionException.java | 43 - .../client/transport/ClientConnectionDelegate.java | 32 +- .../client/transport/ITransportConnection.java | 32 - .../java/org/apache/qpid/client/url/URLParser.java | 8 +- .../org/apache/qpid/client/url/URLParser_0_10.java | 7 +- .../apache/qpid/client/util/BlockingWaiter.java | 21 +- .../client/util/FlowControllingBlockingQueue.java | 6 +- .../java/org/apache/qpid/collections/KeyValue.java | 46 - .../org/apache/qpid/collections/ReferenceMap.java | 957 --------------------- .../collections/keyvalue/AbstractKeyValue.java | 83 -- .../collections/keyvalue/AbstractMapEntry.java | 96 --- .../qpid/collections/keyvalue/DefaultMapEntry.java | 67 -- .../java/org/apache/qpid/jms/BrokerDetails.java | 4 +- .../java/org/apache/qpid/jms/ConnectionURL.java | 1 - .../java/org/apache/qpid/jms/FailoverPolicy.java | 12 +- .../java/org/apache/qpid/jms/MessageProducer.java | 3 +- .../src/main/java/org/apache/qpid/jms/Session.java | 4 +- .../qpid/jms/failover/FailoverExchangeMethod.java | 26 +- .../jms/failover/FailoverRoundRobinServers.java | 8 +- .../qpid/jms/failover/FailoverSingleServer.java | 19 +- .../org/apache/qpid/jms/failover/NoFailover.java | 2 +- .../jndi/PropertiesFileInitialContextFactory.java | 42 +- .../java/org/apache/qpid/jndi/ReadOnlyContext.java | 19 +- .../org/apache/qpid/naming/ReadOnlyContext.java | 509 ----------- .../java/org/apache/qpid/naming/jndi.properties | 40 - .../apache/qpid/nclient/MessagePartListener.java | 3 - .../qpid/nclient/util/ByteBufferMessage.java | 17 +- .../nclient/util/MessagePartListenerAdapter.java | 11 +- 139 files changed, 2307 insertions(+), 4161 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/collections/KeyValue.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/DefaultMapEntry.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/naming/jndi.properties (limited to 'qpid/java/client/src/main/java') diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd index d92d582ec8..495ea6793f 100755 --- a/qpid/java/client/src/main/java/client.bnd +++ b/qpid/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.15.0 +ver: 0.17.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 8311aa80ce..c324e22ab1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -20,15 +20,14 @@ */ package org.apache.qpid.client; -import java.net.URISyntaxException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.url.BindingURL; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; -import org.apache.qpid.url.BindingURL; +import java.net.URISyntaxException; /** * In order to support JMS 1.0 the Qpid implementation maps the diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index b343820d80..09cb9428fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.client; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; - import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + public class AMQBrokerDetails implements BrokerDetails { private String _host; @@ -264,30 +264,12 @@ public class AMQBrokerDetails implements BrokerDetails public boolean getBooleanProperty(String propName) { - return getBooleanProperty(propName, false); - } - - public boolean getBooleanProperty(String propName, boolean defaultValue) - { - if (_options.containsKey(propName)) - { - if (_options.get(propName).equalsIgnoreCase("false")) - { - return false; - } - else if (_options.get(propName).equalsIgnoreCase("true")) - { - return true; - } - else - { - return defaultValue; - } - } - else - { - return defaultValue; - } + if (_options.containsKey(propName)) + { + return Boolean.parseBoolean(_options.get(propName)); + } + + return false; } public void setTimeout(long timeout) @@ -319,18 +301,18 @@ public class AMQBrokerDetails implements BrokerDetails BrokerDetails bd = (BrokerDetails) o; - return _host.equalsIgnoreCase(bd.getHost()) && + return _host.toLowerCase().equals(bd.getHost() == null ? null : bd.getHost().toLowerCase()) && (_port == bd.getPort()) && - _transport.equalsIgnoreCase(bd.getTransport()); + _transport.toLowerCase().equals(bd.getTransport() == null ? null : bd.getTransport().toLowerCase()); //TODO do we need to compare all the options as well? } @Override public int hashCode() { - int result = _host != null ? _host.hashCode() : 0; + int result = _host != null ? _host.toLowerCase().hashCode() : 0; result = 31 * result + _port; - result = 31 * result + (_transport != null ? _transport.hashCode() : 0); + result = 31 * result + (_transport != null ? _transport.toLowerCase().hashCode() : 0); return result; } @@ -457,7 +439,7 @@ public class AMQBrokerDetails implements BrokerDetails if (getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null) { conSettings.setTcpNodelay( - getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY,true)); + getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)); } return conSettings; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6879fe0cfd..1f61e0d218 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,42 +20,14 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -76,8 +48,36 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -106,7 +106,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ - protected AMQProtocolHandler _protocolHandler; + private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); @@ -122,7 +122,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - protected ExceptionListener _exceptionListener; + private ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; @@ -132,15 +132,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message * publication. */ - protected volatile boolean _started; + private volatile boolean _started; /** Policy dictating how to failover */ - protected FailoverPolicy _failoverPolicy; + private FailoverPolicy _failoverPolicy; /* * _Connected should be refactored with a suitable wait object. */ - protected boolean _connected; + private boolean _connected; /* * The connection meta data @@ -156,7 +156,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - protected AMQConnectionDelegate _delegate; + private AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages private int _maxPrefetch; @@ -308,9 +308,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Connection:" + connectionURL); + _logger.debug("Connection:" + connectionURL); } _connectionURL = connectionURL; @@ -343,14 +343,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler = new AMQProtocolHandler(this); - _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } // We are not currently connected - _connected = false; + setConnected(false); boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed && brokerDetails != null) + while (!isConnected() && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -374,7 +377,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // broker initDelegate(pe); } - else if (!_connected) + else if (!isConnected()) { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); @@ -384,10 +387,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_logger.isDebugEnabled()) { - _logger.debug("Are we connected:" + _connected); + _logger.debug("Are we connected:" + isConnected()); } - if (!_connected) + if (!isConnected()) { if (_logger.isDebugEnabled()) { @@ -435,7 +438,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } _sessions.setMaxChannelID(_delegate.getMaxChannelID()); _sessions.setMinChannelID(_delegate.getMinChannelID()); @@ -462,7 +468,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String delegateClassName = String.format ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion()); - _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + if (_logger.isDebugEnabled()) + { + _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + } Class c = Class.forName(delegateClassName); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; @@ -590,7 +599,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean failoverAllowed() { - if (!_connected) + if (!isConnected()) { return false; } @@ -729,6 +738,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } + protected final ExceptionListener getExceptionListenerNoCheck() + { + return _exceptionListener; + } + public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); @@ -804,13 +818,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(List sessions, long timeout) throws JMSException { - if (!_closed.getAndSet(true)) + if (!setClosed()) { - _closing.set(true); + setClosing(true); try{ doClose(sessions, timeout); }finally{ - _closing.set(false); + setClosing(false); } } } @@ -963,7 +977,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); + } public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, @@ -971,7 +986,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, @@ -979,7 +994,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, @@ -988,7 +1003,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO Auto-generated method stub checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public long getMaximumChannelCount() throws JMSException @@ -1048,16 +1063,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _virtualHost; } - public AMQProtocolHandler getProtocolHandler() + public final AMQProtocolHandler getProtocolHandler() { return _protocolHandler; } - public boolean started() + public final boolean started() { return _started; } + protected final boolean isConnected() + { + return _connected; + } + + protected final void setConnected(boolean connected) + { + _connected = connected; + } + public void bytesSent(long writtenBytes) { if (_connectionListener != null) @@ -1226,8 +1251,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { // If we have an IOE/AMQDisconnect there is no connection to close on. - _closing.set(false); - closer = !_closed.getAndSet(true); + setClosing(false); + closer = !setClosed(); _protocolHandler.getProtocolSession().notifyError(je); } @@ -1238,7 +1263,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // decide if we are going to close the session if (hardError(cause)) { - closer = (!_closed.getAndSet(true)) || closer; + closer = (!setClosed()) || closer; { _logger.info("Closing AMQConnection due to :" + cause); } @@ -1489,4 +1514,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _lastFailoverTime; } + protected AMQConnectionDelegate getDelegate() + { + return _delegate; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 7fc1d25c18..b6f25a2cef 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import java.io.IOException; - -import javax.jms.JMSException; -import javax.jms.XASession; - import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -32,6 +27,10 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; +import javax.jms.JMSException; +import javax.jms.XASession; +import java.io.IOException; + public interface AMQConnectionDelegate { ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0ded689ea6..56ee56d178 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -21,16 +21,8 @@ package org.apache.qpid.client; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.XASession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; @@ -42,6 +34,7 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Session; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; @@ -53,8 +46,16 @@ import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.SessionDetachCode; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.XASession; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { @@ -71,7 +72,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec /** * The QpidConeection instance that is mapped with this JMS connection. */ - org.apache.qpid.transport.Connection _qpidConnection; + private org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; //--- constructor @@ -109,7 +110,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow,name); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -152,7 +153,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -164,7 +165,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } - @Override public XASession createXASession(int ackMode) throws JMSException { @@ -182,7 +182,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -218,10 +218,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL())); _qpidConnection.connect(conSettings); - _conn._connected = true; + _conn.setConnected(true); _conn.setUsername(_qpidConnection.getUserID()); _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); - _conn._failoverPolicy.attainedConnection(); + _conn.getFailoverPolicy().attainedConnection(); } catch (ProtocolVersionException pe) { @@ -327,7 +327,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } - ExceptionListener listener = _conn._exceptionListener; + ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { _logger.error("connection exception: " + conn, exc); @@ -429,7 +429,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec Map clientProps = new HashMap(); try { - clientProps.put("clientName", _conn.getClientID()); + clientProps.put(ConnectionStartProperties.CLIENT_ID_0_10, _conn.getClientID()); conSettings.setClientProperties(clientProps); } catch (JMSException e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 74a0956933..08ee7c3705 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -20,19 +20,8 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.security.GeneralSecurityException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Set; - -import javax.jms.JMSException; -import javax.jms.XASession; -import javax.net.ssl.SSLContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; @@ -60,8 +49,19 @@ import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.XASession; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.security.GeneralSecurityException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Set; public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { @@ -71,30 +71,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public void closeConnection(long timeout) throws JMSException, AMQException { - final AMQStateManager stateManager = _conn.getProtocolHandler().getStateManager(); - final AMQState currentState = stateManager.getCurrentState(); - - if (currentState.equals(AMQState.CONNECTION_CLOSED)) - { - _logger.debug("Connection already closed."); - } - else if (currentState.equals(AMQState.CONNECTION_CLOSING)) - { - _logger.debug("Connection already closing, awaiting closed state."); - final StateWaiter closeWaiter = new StateWaiter(stateManager, currentState, EnumSet.of(AMQState.CONNECTION_CLOSED)); - try - { - closeWaiter.await(timeout); - } - catch (AMQTimeoutException te) - { - throw new AMQTimeoutException("Close did not complete in timely fashion", te); - } - } - else - { - _conn.getProtocolHandler().closeConnection(timeout); - } + _conn.getProtocolHandler().closeConnection(timeout); } public AMQConnectionDelegate_8_0(AMQConnection conn) @@ -120,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); + StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); ConnectionSettings settings = brokerDetail.buildConnectionSettings(); settings.setProtocol(brokerDetail.getTransport()); @@ -133,10 +110,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate sslContext = SSLContextFactory.buildClientContext( settings.getTrustStorePath(), settings.getTrustStorePassword(), - settings.getTrustStoreCertType(), + settings.getTrustManagerFactoryAlgorithm(), settings.getKeyStorePath(), settings.getKeyStorePassword(), - settings.getKeyStoreCertType(), + settings.getKeyManagerFactoryAlgorithm(), settings.getCertAlias()); } catch (GeneralSecurityException e) @@ -148,9 +125,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext); - _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender())); - _conn._protocolHandler.getProtocolSession().init(); + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext); + _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); + _conn.getProtocolHandler().getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -158,13 +135,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate if(state == AMQState.CONNECTION_OPEN) { - _conn._failoverPolicy.attainedConnection(); - _conn._connected = true; + _conn.getFailoverPolicy().attainedConnection(); + _conn.setConnected(true); return null; } else { - return _conn._protocolHandler.getSuggestedProtocolVersion(); + return _conn.getProtocolHandler().getSuggestedProtocolVersion(); } } @@ -213,7 +190,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate AMQSession session = new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); _conn.registerSession(channelId, session); boolean success = false; @@ -237,7 +213,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - if (_conn._started) + if (_conn.started()) { try { @@ -271,12 +247,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); // TODO: Be aware of possible changes to parameter order as versions change. - _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); - _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); if (transacted) { @@ -287,7 +263,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); // TODO: Be aware of possible changes to parameter order as versions change. - _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); + _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } } @@ -307,7 +283,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); s.resubscribe(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index 700073488e..e684cf9074 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -20,12 +20,23 @@ */ package org.apache.qpid.client; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Hashtable; -import java.util.UUID; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XATopicConnection; +import javax.jms.XATopicConnectionFactory; import javax.naming.Context; import javax.naming.Name; import javax.naming.NamingException; @@ -34,10 +45,10 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import javax.naming.spi.ObjectFactory; - -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Hashtable; +import java.util.UUID; public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, @@ -46,12 +57,6 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF { private final ConnectionURL _connectionDetails; - // The default constructor is necessary to allow AMQConnectionFactory to be deserialised from JNDI - public AMQConnectionFactory() - { - _connectionDetails = null; - } - public AMQConnectionFactory(final String url) throws URLSyntaxException { if (url == null) @@ -119,35 +124,27 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF public Connection createConnection(String userName, String password, String id) throws JMSException { - if (_connectionDetails != null) + try { - try + _connectionDetails.setUsername(userName); + _connectionDetails.setPassword(password); + + if (id != null && !id.equals("")) { - ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString()); - connectionDetails.setUsername(userName); - connectionDetails.setPassword(password); - - if (id != null && !id.equals("")) - { - connectionDetails.setClientName(id); - } - else if (connectionDetails.getClientName() == null || connectionDetails.getClientName().equals("")) - { - connectionDetails.setClientName(getUniqueClientID()); - } - return new AMQConnection(connectionDetails); - } - catch (Exception e) + _connectionDetails.setClientName(id); + } + else if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) { - JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + _connectionDetails.setClientName(getUniqueClientID()); } + return new AMQConnection(_connectionDetails); } - else + catch (Exception e) { - throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; } } @@ -266,7 +263,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * * @return A newly created XAConnection * @throws JMSException If creating the XAConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XAConnection createXAConnection() throws JMSException { @@ -293,36 +290,25 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * @param password the caller's password * @return A newly created XAConnection. * @throws JMSException If creating the XAConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XAConnection createXAConnection(String username, String password) throws JMSException { if (_connectionDetails != null) { - try - { - ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString()); - connectionDetails.setUsername(username); - connectionDetails.setPassword(password); - - if (connectionDetails.getClientName() == null || connectionDetails.getClientName().equals("")) - { - connectionDetails.setClientName(getUniqueClientID()); - } - return new XAConnectionImpl(connectionDetails); - } - catch (Exception e) + _connectionDetails.setUsername(username); + _connectionDetails.setPassword(password); + + if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) { - JMSException jmse = new JMSException("Error creating XA Connection: " + e.getMessage()); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + _connectionDetails.setClientName(getUniqueClientID()); } } else { - throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); - } + throw new JMSException("A URL must be specified to access XA connections"); + } + return createXAConnection(); } @@ -334,7 +320,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * * @return A newly created XATopicConnection * @throws JMSException If creating the XATopicConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XATopicConnection createXATopicConnection() throws JMSException { @@ -351,7 +337,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * @param password the caller's password * @return A newly created XATopicConnection. * @throws JMSException If creating the XATopicConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XATopicConnection createXATopicConnection(String username, String password) throws JMSException { @@ -366,7 +352,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * * @return A newly created XAQueueConnection * @throws JMSException If creating the XAQueueConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XAQueueConnection createXAQueueConnection() throws JMSException { @@ -383,7 +369,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * @param password the caller's password * @return A newly created XAQueueConnection. * @throws JMSException If creating the XAQueueConnection fails due to some internal error. - * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + * @throws javax.jms.JMSSecurityException If client authentication fails due to an invalid user name or password. */ public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index ee55eb9ce9..d59f48542f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - import org.apache.qpid.client.url.URLParser; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; @@ -32,6 +27,11 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class AMQConnectionURL implements ConnectionURL { private String _url; @@ -50,7 +50,10 @@ public class AMQConnectionURL implements ConnectionURL public AMQConnectionURL(String fullURL) throws URLSyntaxException { - if (fullURL == null) throw new IllegalArgumentException("URL cannot be null"); + if (fullURL == null) + { + throw new IllegalArgumentException("URL cannot be null"); + } _url = fullURL; _options = new HashMap(); _brokers = new LinkedList(); @@ -273,7 +276,8 @@ public class AMQConnectionURL implements ConnectionURL if (_failoverMethod != null) { sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); - sb.append(OPTIONS_FAILOVER + "='"); + sb.append(OPTIONS_FAILOVER); + sb.append("='"); sb.append(_failoverMethod); sb.append(URLHelper.printOptions(_failoverOptions)); sb.append("'"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 61fe722423..530186b1f9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,15 +20,8 @@ */ package org.apache.qpid.client; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Destination; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; @@ -40,23 +33,30 @@ import org.apache.qpid.messaging.Address; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.Destination; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public abstract class AMQDestination implements Destination, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class); - protected AMQShortString _exchangeName; + private AMQShortString _exchangeName; - protected AMQShortString _exchangeClass; + private AMQShortString _exchangeClass; - protected boolean _isDurable; + private boolean _isDurable; - protected boolean _isExclusive; + private boolean _isExclusive; - protected boolean _isAutoDelete; + private boolean _isAutoDelete; private boolean _browseOnly; @@ -81,6 +81,41 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; + protected void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + protected AddressHelper getAddrHelper() + { + return _addrHelper; + } + + protected void setAddrHelper(AddressHelper addrHelper) + { + _addrHelper = addrHelper; + } + + protected String getName() + { + return _name; + } + + protected void setName(String name) + { + _name = name; + } + + protected Link getTargetLink() + { + return _targetLink; + } + + protected void setTargetLink(Link targetLink) + { + _targetLink = targetLink; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -109,31 +144,46 @@ public abstract class AMQDestination implements Destination, Referenceable public static AddressOption getOption(String str) { - if ("always".equals(str)) return ALWAYS; - else if ("never".equals(str)) return NEVER; - else if ("sender".equals(str)) return SENDER; - else if ("receiver".equals(str)) return RECEIVER; - else throw new IllegalArgumentException(str + " is not an allowed value"); + if ("always".equals(str)) + { + return ALWAYS; + } + else if ("never".equals(str)) + { + return NEVER; + } + else if ("sender".equals(str)) + { + return SENDER; + } + else if ("receiver".equals(str)) + { + return RECEIVER; + } + else + { + throw new IllegalArgumentException(str + " is not an allowed value"); + } } } - protected final static DestSyntax defaultDestSyntax; + private final static DestSyntax defaultDestSyntax; - protected DestSyntax _destSyntax = DestSyntax.ADDR; + private DestSyntax _destSyntax = DestSyntax.ADDR; - protected AddressHelper _addrHelper; - protected Address _address; - protected int _addressType = AMQDestination.UNKNOWN_TYPE; - protected String _name; - protected String _subject; - protected AddressOption _create = AddressOption.NEVER; - protected AddressOption _assert = AddressOption.NEVER; - protected AddressOption _delete = AddressOption.NEVER; + private AddressHelper _addrHelper; + private Address _address; + private int _addressType = AMQDestination.UNKNOWN_TYPE; + private String _name; + private String _subject; + private AddressOption _create = AddressOption.NEVER; + private AddressOption _assert = AddressOption.NEVER; + private AddressOption _delete = AddressOption.NEVER; - protected Node _targetNode; - protected Node _sourceNode; - protected Link _targetLink; - protected Link _link; + private Node _targetNode; + private Node _sourceNode; + private Link _targetLink; + private Link _link; // ----- / Fields required to support new address syntax ------- @@ -543,7 +593,7 @@ public abstract class AMQDestination implements Destination, Referenceable { return true; } - if (o == null || getClass() != o.getClass()) + if (!(o instanceof AMQDestination)) { return false; } @@ -572,7 +622,6 @@ public abstract class AMQDestination implements Destination, Referenceable int result; result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode(); result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode()); - //result = 29 * result + _destinationName.hashCode(); if (_queueName != null) { result = 29 * result + _queueName.hashCode(); @@ -631,10 +680,10 @@ public abstract class AMQDestination implements Destination, Referenceable public static class Binding { - String exchange; - String bindingKey; - String queue; - Map args; + private String exchange; + private String bindingKey; + private String queue; + private Map args; public Binding(String exchange, String queue, @@ -887,4 +936,5 @@ public abstract class AMQDestination implements Destination, Referenceable return _rejectBehaviour; } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 5ecb5d5913..4e9b53c814 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.client; -import java.net.URISyntaxException; - -import javax.jms.Queue; - import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.BindingURL; +import javax.jms.Queue; +import java.net.URISyntaxException; + public class AMQQueue extends AMQDestination implements Queue { protected AMQQueue() @@ -156,7 +155,6 @@ public class AMQQueue extends AMQDestination implements Queue public AMQShortString getRoutingKey() { - //return getAMQQueueName(); if (getAMQQueueName() != null && getAMQQueueName().equals(super.getRoutingKey())) { return getAMQQueueName(); @@ -173,4 +171,10 @@ public class AMQQueue extends AMQDestination implements Queue //remain valid if we failover (see BLZ-24) return getQueueName() == null; } + + @Override + public boolean equals(Object o) + { + return super.equals(o) && o instanceof Queue; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 3f9eadeef3..0c6031ea91 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.filter.JMSSelectorFilter; +import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,21 +45,59 @@ public class AMQQueueBrowser implements QueueBrowser private AtomicBoolean _isClosed = new AtomicBoolean(); private final AMQSession _session; - private final AMQQueue _queue; + private final Queue _queue; private final ArrayList _consumers = new ArrayList(); private final String _messageSelector; - AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException + AMQQueueBrowser(AMQSession session, Queue queue, String messageSelector) throws JMSException { _session = session; _queue = queue; _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; - // Create Consumer to verify message selector. - BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); - // Close this consumer as we are not looking to consume only to establish that, at least for now, - // the QB can be created - consumer.close(); + + + validateQueue((AMQDestination) queue); + + if(_messageSelector != null) + { + validateSelector(_messageSelector); + } + } + + private void validateSelector(String messageSelector) throws InvalidSelectorException + { + try + { + new JMSSelectorFilter(messageSelector); + } + catch (AMQInternalException e) + { + throw new InvalidSelectorException(e.getMessage()); + } + } + + private void validateQueue(AMQDestination queue) throws JMSException + { + try + { + // Essentially just test the connection/session is still active + _session.sync(); + // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :( + // _session.declareQueuePassive( queue ); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw new InvalidDestinationException(e.getMessage()); + } + else + { + final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode())); + jmsException.setLinkedException(e); + throw jmsException; + } + } } public Queue getQueue() throws JMSException @@ -88,6 +132,10 @@ public class AMQQueueBrowser implements QueueBrowser public Enumeration getEnumeration() throws JMSException { checkState(); + if(!_session.getAMQConnection().started()) + { + throw new IllegalStateException("Cannot enumerate message on the queue while the Connection is stopped"); + } final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); @@ -108,7 +156,7 @@ public class AMQQueueBrowser implements QueueBrowser private class QueueBrowserEnumeration implements Enumeration { - Message _nextMessage; + private Message _nextMessage; private BasicMessageConsumer _consumer; public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException @@ -118,12 +166,12 @@ public class AMQQueueBrowser implements QueueBrowser _consumer = consumer; prefetchMessage(); } - _logger.info("QB:created with first element:" + _nextMessage); + _logger.debug("QB:created with first element:" + _nextMessage); } public boolean hasMoreElements() { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.debug("QB:hasMoreElements:" + (_nextMessage != null)); return (_nextMessage != null); } @@ -136,9 +184,9 @@ public class AMQQueueBrowser implements QueueBrowser } try { - _logger.info("QB:nextElement about to receive"); + _logger.debug("QB:nextElement about to receive"); prefetchMessage(); - _logger.info("QB:nextElement received:" + _nextMessage); + _logger.debug("QB:nextElement received:" + _nextMessage); } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java index a8c83d8868..c8cb49b53e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java @@ -20,185 +20,64 @@ */ package org.apache.qpid.client; -import java.io.Serializable; - -import javax.jms.BytesMessage; -import javax.jms.Destination; +import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; /** * Need this adaptor class to conform to JMS spec and throw IllegalStateException * from createDurableSubscriber, unsubscribe, createTopic & createTemporaryTopic */ -public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter +class AMQQueueSessionAdaptor extends AMQSessionAdapter implements QueueSession { - //holds a session for delegation - protected final AMQSession _session; - /** * Construct an adaptor with a session to wrap * @param session */ - public AMQQueueSessionAdaptor(Session session) + protected AMQQueueSessionAdaptor(QueueSession session) { - _session = (AMQSession) session; - } - - public TemporaryQueue createTemporaryQueue() throws JMSException { - return _session.createTemporaryQueue(); - } - - public Queue createQueue(String string) throws JMSException { - return _session.createQueue(string); - } - - public QueueReceiver createReceiver(Queue queue) throws JMSException { - return _session.createReceiver(queue); - } - - public QueueReceiver createReceiver(Queue queue, String string) throws JMSException { - return _session.createReceiver(queue, string); - } - - public QueueSender createSender(Queue queue) throws JMSException { - return _session.createSender(queue); - } - - public QueueBrowser createBrowser(Queue queue) throws JMSException { - return _session.createBrowser(queue); - } - - public QueueBrowser createBrowser(Queue queue, String string) throws JMSException { - return _session.createBrowser(queue, string); - } - - public BytesMessage createBytesMessage() throws JMSException { - return _session.createBytesMessage(); - } - - public MapMessage createMapMessage() throws JMSException { - return _session.createMapMessage(); - } - - public Message createMessage() throws JMSException { - return _session.createMessage(); - } - - public ObjectMessage createObjectMessage() throws JMSException { - return _session.createObjectMessage(); - } - - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return _session.createObjectMessage(serializable); - } - - public StreamMessage createStreamMessage() throws JMSException { - return _session.createStreamMessage(); - } - - public TextMessage createTextMessage() throws JMSException { - return _session.createTextMessage(); - } - - public TextMessage createTextMessage(String string) throws JMSException { - return _session.createTextMessage(string); - } - - public boolean getTransacted() throws JMSException { - return _session.getTransacted(); - } - - public int getAcknowledgeMode() throws JMSException { - return _session.getAcknowledgeMode(); - } - - public void commit() throws JMSException { - _session.commit(); - } - - public void rollback() throws JMSException { - _session.rollback(); - } - - public void close() throws JMSException { - _session.close(); - } - - public void recover() throws JMSException { - _session.recover(); - } - - public MessageListener getMessageListener() throws JMSException { - return _session.getMessageListener(); + super(session); } - public void setMessageListener(MessageListener messageListener) throws JMSException { - _session.setMessageListener(messageListener); - } - - public void run() { - _session.run(); - } - - public MessageProducer createProducer(Destination destination) throws JMSException { - return _session.createProducer(destination); - } - - public MessageConsumer createConsumer(Destination destination) throws JMSException { - return _session.createConsumer(destination); + public QueueReceiver createReceiver(Queue queue) throws JMSException + { + return getSession().createReceiver(queue); } - public MessageConsumer createConsumer(Destination destination, String string) throws JMSException { - return _session.createConsumer(destination,string); + public QueueReceiver createReceiver(Queue queue, String string) throws JMSException + { + return getSession().createReceiver(queue, string); } - public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException { - return _session.createConsumer(destination,string,b); + public QueueSender createSender(Queue queue) throws JMSException + { + return getSession().createSender(queue); } //The following methods cannot be called from a QueueSession as per JMS spec - public Topic createTopic(String string) throws JMSException { + public Topic createTopic(String string) throws JMSException + { throw new IllegalStateException("Cannot call createTopic from QueueSession"); } - public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException + { throw new IllegalStateException("Cannot call createDurableSubscriber from QueueSession"); } - public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException + { throw new IllegalStateException("Cannot call createDurableSubscriber from QueueSession"); } - public TemporaryTopic createTemporaryTopic() throws JMSException { + public TemporaryTopic createTemporaryTopic() throws JMSException + { throw new IllegalStateException("Cannot call createTemporaryTopic from QueueSession"); } - public void unsubscribe(String string) throws JMSException { + public void unsubscribe(String string) throws JMSException + { throw new IllegalStateException("Cannot call unsubscribe from QueueSession"); } - public AMQSession getSession() - { - return _session; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 48c4e3e3e6..efc5982dac 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,50 +20,8 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.TransactionRolledBackException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -89,7 +47,7 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; @@ -98,8 +56,27 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** *

@@ -119,150 +96,65 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } + /** Used for debugging. */ + private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - public Collection values() - { - ArrayList values = new ArrayList(); + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; - return values; - } + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - final AMQSession _thisSession = this; - - /** Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); /** * The period to wait while flow controlled before declaring a failure */ - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - protected final boolean DECLARE_QUEUES = + private final boolean _delareQueues = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - protected final boolean DECLARE_EXCHANGES = + private final boolean _declareExchanges = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); - - protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE; - - /** System property to enable strict AMQP compliance. */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - - /** Strict AMQP default setting. */ - public static final String STRICT_AMQP_DEFAULT = "false"; - /** System property to enable failure if strict AMQP compliance is violated. */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - - /** Strickt AMQP failure default. */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - - /** System property to enable immediate message prefetching. */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + private final boolean _useAMQPEncodedMapMessage; - /** Immediate message prefetch default. */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + /** + * Flag indicating to start dispatcher as a daemon thread + */ + protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); /** The connection to which this session belongs. */ - protected AMQConnection _connection; + private AMQConnection _connection; /** Used to indicate whether or not this is a transactional session. */ - protected final boolean _transacted; + private final boolean _transacted; /** Holds the sessions acknowledgement mode. */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ - protected int _channelId; + private int _channelId; private int _ticket; @@ -278,55 +170,30 @@ public abstract class AMQSession> _subscriptions = + private final ConcurrentHashMap> _subscriptions = new ConcurrentHashMap>(); - /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked - * up in the {@link #_subscriptions} map. - */ - protected final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap(); + private final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap(); - /** - * Locks to keep access to subscriber details atomic. - *

- * Added for QPID2418 - */ - protected final Lock _subscriberDetails = new ReentrantLock(true); - protected final Lock _subscriberAccess = new ReentrantLock(true); + private final Lock _subscriberDetails = new ReentrantLock(true); + private final Lock _subscriberAccess = new ReentrantLock(true); - /** - * Used to hold incoming messages. - * - * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. - */ - protected final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue _queue; - /** Holds the highest received delivery tag. */ - protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - /** Pre-fetched message tags */ - protected ConcurrentLinkedQueue _prefetchedMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _prefetchedMessageTags = new ConcurrentLinkedQueue(); - /** All the not yet acknowledged message tags */ - protected ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); - /** All the delivered message tags */ - protected ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); - /** Holds the dispatcher thread for this session. */ - protected Dispatcher _dispatcher; + private volatile Dispatcher _dispatcher; - protected Thread _dispatcherThread; + private volatile Thread _dispatcherThread; - /** Holds the message factory factory for this session. */ - protected MessageFactoryRegistry _messageFactoryRegistry; + private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map _producers = new ConcurrentHashMap(); @@ -337,11 +204,7 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); + private final IdToConsumerMap _consumers = new IdToConsumerMap(); /** * Contains a list of consumers which have been removed but which might still have @@ -367,10 +230,6 @@ public abstract class AMQSession getPrefetchedMessageTags() + { + return _prefetchedMessageTags; + } + + /** All the not yet acknowledged message tags */ + protected ConcurrentLinkedQueue getUnacknowledgedMessageTags() + { + return _unacknowledgedMessageTags; + } + + /** All the delivered message tags */ + protected ConcurrentLinkedQueue getDeliveredMessageTags() + { + return _deliveredMessageTags; + } + + /** Holds the dispatcher thread for this session. */ + protected Dispatcher getDispatcher() + { + return _dispatcher; + } + + protected Thread getDispatcherThread() + { + return _dispatcherThread; + } + + /** Holds the message factory factory for this session. */ + protected MessageFactoryRegistry getMessageFactoryRegistry() + { + return _messageFactoryRegistry; + } + + /** + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. + */ + protected IdToConsumerMap getConsumers() + { + return _consumers; + } + + protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) + { + _usingDispatcherForCleanup = usingDispatcherForCleanup; + } + + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + protected boolean isImmediatePrefetch() + { + return _immediatePrefetch; + } + + public static final class IdToConsumerMap + { + private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; + private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap(); + + public C get(int id) + { + if ((id & 0xFFFFFFF0) == 0) + { + return (C) _fastAccessConsumers[id]; + } + else + { + return _slowAccessConsumers.get(id); + } + } + + public C put(int id, C consumer) + { + C oldVal; + if ((id & 0xFFFFFFF0) == 0) + { + oldVal = (C) _fastAccessConsumers[id]; + _fastAccessConsumers[id] = consumer; + } + else + { + oldVal = _slowAccessConsumers.put(id, consumer); + } + + return oldVal; + + } + + public C remove(int id) + { + C consumer; + if ((id & 0xFFFFFFF0) == 0) + { + consumer = (C) _fastAccessConsumers[id]; + _fastAccessConsumers[id] = null; + } + else + { + consumer = _slowAccessConsumers.remove(id); + } + + return consumer; + + } + + public Collection values() + { + ArrayList values = new ArrayList(); + + for (int i = 0; i < 16; i++) + { + if (_fastAccessConsumers[i] != null) + { + values.add((C) _fastAccessConsumers[i]); + } + } + values.addAll(_slowAccessConsumers.values()); + + return values; + } + + public void clear() + { + _slowAccessConsumers.clear(); + for (int i = 0; i < 16; i++) + { + _fastAccessConsumers[i] = null; + } + } + } + private static final class FlowControlIndicator { private volatile boolean _flowControl = true; @@ -426,9 +420,6 @@ public abstract class AMQSession i = _consumers.values().iterator(); i.hasNext();) - // { - // BasicMessageConsumer consumer = i.next(); - // - // if (consumer.isReceiving()) - // { - // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously."); - // } - // } - // - // _messageListener = listener; - // - // for (Iterator i = _consumers.values().iterator(); i.hasNext();) - // { - // i.next().setMessageListener(_messageListener); - // } - } /** @@ -2184,7 +2153,7 @@ public abstract class AMQSessionNote that this operation automatically retries in the event of fail-over. - * - * @param queueName The name of the queue to bind. - * @param routingKey The routing key to bind the queue with. - * @param arguments Additional arguments. - * @param exchangeName The exchange to bind the queue on. - * - * @throws AMQException If the queue cannot be bound for any reason. - */ - /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) - throws AMQException, FailoverException - { - AMQFrame queueBind = - QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); - }*/ - private void checkNotTransacted() throws JMSException { if (getTransacted()) @@ -2580,7 +2575,7 @@ public abstract class AMQSession( @@ -2639,8 +2634,8 @@ public abstract class AMQSession( - new FailoverProtectedOperation() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); + return getQueueDepth(amqd, false); + } + /** + * Returns the number of messages currently queued by the given + * destination. Syncs session before receiving the queue depth if sync is + * set to true. + * + * @param amqd AMQ destination to get the depth value + * @param sync flag to sync session before receiving the queue depth + * @return queue depth + * @throws AMQException + */ + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + { + return new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Long execute() throws AMQException, FailoverException + { + try + { + return requestQueueDepth(amqd, sync); + } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } + } + }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. @@ -2703,6 +2718,12 @@ public abstract class AMQSessionNote that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. * + * @param amqd The destination to declare as a queue. * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of * the client. * + * + * * @throws AMQException If the queue cannot be declared for any reason. * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal) throws AMQException { - return declareQueue(amqd, protocolHandler, noLocal, false); + return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) + throws AMQException + { + return declareQueue(amqd, noLocal, nowait, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new FailoverNoopSupport( new FailoverProtectedOperation() { @@ -2744,7 +2773,7 @@ public abstract class AMQSession= System.currentTimeMillis() ) { - _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); - _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } } if(!_flowControl.getFlowControl()) { _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); } } @@ -3154,7 +3190,7 @@ public abstract class AMQSession implements Session { - public AMQSession getSession(); + private final T _session; + + protected AMQSessionAdapter(final T session) + { + _session = session; + } + + public T getSession() + { + return _session; + } + + public BytesMessage createBytesMessage() throws JMSException + { + return _session.createBytesMessage(); + } + + public MapMessage createMapMessage() throws JMSException + { + return _session.createMapMessage(); + } + + public Message createMessage() throws JMSException + { + return _session.createMessage(); + } + + public ObjectMessage createObjectMessage() throws JMSException + { + return _session.createObjectMessage(); + } + + public ObjectMessage createObjectMessage(final Serializable serializable) throws JMSException + { + return _session.createObjectMessage(serializable); + } + + public StreamMessage createStreamMessage() throws JMSException + { + return _session.createStreamMessage(); + } + + public TextMessage createTextMessage() throws JMSException + { + return _session.createTextMessage(); + } + + public TextMessage createTextMessage(final String s) throws JMSException + { + return _session.createTextMessage(s); + } + + public boolean getTransacted() throws JMSException + { + return _session.getTransacted(); + } + + public int getAcknowledgeMode() throws JMSException + { + return _session.getAcknowledgeMode(); + } + + public void commit() throws JMSException + { + _session.commit(); + } + + public void rollback() throws JMSException + { + _session.rollback(); + } + + public void close() throws JMSException + { + _session.close(); + } + + public void recover() throws JMSException + { + _session.recover(); + } + + public MessageListener getMessageListener() throws JMSException + { + return _session.getMessageListener(); + } + + public void setMessageListener(final MessageListener messageListener) throws JMSException + { + _session.setMessageListener(messageListener); + } + + public void run() + { + _session.run(); + } + + public MessageProducer createProducer(final Destination destination) throws JMSException + { + return _session.createProducer(destination); + } + + public MessageConsumer createConsumer(final Destination destination) throws JMSException + { + return _session.createConsumer(destination); + } + + public MessageConsumer createConsumer(final Destination destination, final String s) throws JMSException + { + return _session.createConsumer(destination, s); + } + + public MessageConsumer createConsumer(final Destination destination, final String s, final boolean b) + throws JMSException + { + return _session.createConsumer(destination, s, b); + } + + public Queue createQueue(final String s) throws JMSException + { + return _session.createQueue(s); + } + + public Topic createTopic(final String s) throws JMSException + { + return _session.createTopic(s); + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, final String s) throws JMSException + { + return _session.createDurableSubscriber(topic, s); + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, final String s, final String s1, final boolean b) + throws JMSException + { + return _session.createDurableSubscriber(topic, s, s1, b); + } + + public QueueBrowser createBrowser(final Queue queue) throws JMSException + { + return _session.createBrowser(queue); + } + + public QueueBrowser createBrowser(final Queue queue, final String s) throws JMSException + { + return _session.createBrowser(queue, s); + } + + public TemporaryQueue createTemporaryQueue() throws JMSException + { + return _session.createTemporaryQueue(); + } + + public TemporaryTopic createTemporaryTopic() throws JMSException + { + return _session.createTemporaryTopic(); + } + + public void unsubscribe(final String s) throws JMSException + { + _session.unsubscribe(s); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8395c8f4b7..3902c726f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,11 +17,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; - import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -34,10 +29,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; - import javax.jms.Destination; import javax.jms.JMSException; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -55,11 +48,14 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -78,6 +74,7 @@ public class AMQSession_0_10 extends AMQSession= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } @@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQSession 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); + (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -444,8 +446,8 @@ public class AMQSession_0_10 extends AMQSession deliveredIter = delivered.iterator(); deliveredIter.hasNext();) { Range range = deliveredIter.next(); @@ -526,9 +528,9 @@ public class AMQSession_0_10 extends AMQSession 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch())) { // set the flow getQpidSession().messageFlow(consumerTag, @@ -648,12 +650,12 @@ public class AMQSession_0_10 extends AMQSession arguments = new HashMap(); + arguments.putAll((Map) node.getDeclareArgs()); + if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , - node.getDeclareArgs(), + arguments, node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); @@ -795,15 +804,16 @@ public class AMQSession_0_10 extends AMQSession(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport( new FailoverProtectedOperation() { @@ -939,14 +950,18 @@ public class AMQSession_0_10 extends AMQSession 0 && (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) + if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 || + getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1039,7 +1054,7 @@ public class AMQSession_0_10 extends AMQSessionemptyMap())); + send0_10QueueDeclare(dest,null,noLocal,true, false); + getQpidSession().exchangeBind(dest.getQueueName(), + dest.getAddressName(), + dest.getSubject(), + Collections.emptyMap()); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); } @@ -1328,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession 0 ) { @@ -1344,15 +1360,53 @@ public class AMQSession_0_10 extends AMQSession // messages sent by the brokers following the first rollback // after failover - _highestDeliveryTag.set(-1); + getHighestDeliveryTag().set(-1); // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to //messages that came from the old broker. _txRangeSet.clear(); _txSize = 0; - _unacknowledgedMessageTags.clear(); - _prefetchedMessageTags.clear(); + getUnacknowledgedMessageTags().clear(); + getPrefetchedMessageTags().clear(); super.resubscribe(); getQpidSession().sync(); } + + @Override + void stop() throws AMQException + { + super.stop(); + setUsingDispatcherForCleanup(true); + drainDispatchQueue(); + setUsingDispatcherForCleanup(false); + + for (BasicMessageConsumer consumer : getConsumers().values()) + { + List tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + getPrefetchedMessageTags().addAll(tags); + } + + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 7daebbff04..8ab23a240e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,13 +21,8 @@ package org.apache.qpid.client; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.ArrayList; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -43,44 +38,20 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRecoverSyncBody; -import org.apache.qpid.framing.BasicRecoverSyncOkBody; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitOkBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; public class AMQSession_0_8 extends AMQSession { @@ -131,7 +102,7 @@ public class AMQSession_0_8 extends AMQSession arguments) throws AMQException, @@ -218,22 +189,22 @@ public class AMQSession_0_8 extends AMQSession consumersToCheck = new ArrayList(_consumers.values()); + ArrayList consumersToCheck = new ArrayList(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; for(BasicMessageConsumer_0_8 consumer : consumersToCheck) @@ -287,7 +258,7 @@ public class AMQSession_0_8 extends AMQSession getQueueDestinationCache() @@ -607,9 +582,18 @@ public class AMQSession_0_8 extends AMQSession implements TopicSession { - protected final AMQSession _session; - - public AMQTopicSessionAdaptor(Session session) - { - _session = (AMQSession) session; - } - public Topic createTopic(String string) throws JMSException + public AMQTopicSessionAdaptor(TopicSession session) { - return _session.createTopic(string); + super(session); } public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return _session.createSubscriber(topic); + return getSession().createSubscriber(topic); } public TopicSubscriber createSubscriber(Topic topic, String string, boolean b) throws JMSException { - return _session.createSubscriber(topic, string, b); - } - - public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException - { - return _session.createDurableSubscriber(topic, string); - } - - public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException - { - return _session.createDurableSubscriber(topic, string, string1, b); + return getSession().createSubscriber(topic, string, b); } public TopicPublisher createPublisher(Topic topic) throws JMSException { - return _session.createPublisher(topic); - } - - public TemporaryTopic createTemporaryTopic() throws JMSException - { - return _session.createTemporaryTopic(); - } - - public void unsubscribe(String string) throws JMSException - { - _session.unsubscribe(string); - } - - public BytesMessage createBytesMessage() throws JMSException - { - return _session.createBytesMessage(); - } - - public MapMessage createMapMessage() throws JMSException - { - return _session.createMapMessage(); - } - - public Message createMessage() throws JMSException - { - return _session.createMessage(); - } - - public ObjectMessage createObjectMessage() throws JMSException - { - return _session.createObjectMessage(); - } - - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException - { - return _session.createObjectMessage(serializable); - } - - public StreamMessage createStreamMessage() throws JMSException - { - return _session.createStreamMessage(); - } - - public TextMessage createTextMessage() throws JMSException - { - return _session.createTextMessage(); - } - - public TextMessage createTextMessage(String string) throws JMSException - { - return _session.createTextMessage(string); - } - - public boolean getTransacted() throws JMSException - { - return _session.getTransacted(); - } - - public int getAcknowledgeMode() throws JMSException - { - return _session.getAcknowledgeMode(); - } - - public void commit() throws JMSException - { - _session.commit(); - } - - public void rollback() throws JMSException - { - _session.rollback(); - } - - public void close() throws JMSException - { - _session.close(); - } - - public void recover() throws JMSException - { - _session.recover(); - } - - public MessageListener getMessageListener() throws JMSException - { - return _session.getMessageListener(); - } - - public void setMessageListener(MessageListener messageListener) throws JMSException - { - _session.setMessageListener(messageListener); - } - - public void run() - { - _session.run(); - } - - public MessageProducer createProducer(Destination destination) throws JMSException - { - return _session.createProducer(destination); - } - - public MessageConsumer createConsumer(Destination destination) throws JMSException - { - return _session.createConsumer(destination); - } - - public MessageConsumer createConsumer(Destination destination, String string) throws JMSException - { - return _session.createConsumer(destination, string); - } - - public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException - { - return _session.createConsumer(destination, string, b); + return getSession().createPublisher(topic); } //The following methods cannot be called from a TopicSession as per JMS spec @@ -219,8 +67,4 @@ public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession"); } - public AMQSession getSession() - { - return _session; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c6e5fbb019..0d717a3216 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,29 +20,35 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQInternalException; -import org.apache.qpid.filter.JMSSelectorFilter; -import org.apache.qpid.filter.MessageFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.filter.MessageFilter; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.CloseConsumerMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.*; +import org.apache.qpid.client.filter.JMSSelectorFilter; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -54,14 +60,13 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); - /** The connection being used by this consumer */ - protected final AMQConnection _connection; + private final AMQConnection _connection; - protected final MessageFilter _messageSelectorFilter; + private final MessageFilter _messageSelectorFilter; private final boolean _noLocal; - protected AMQDestination _destination; + private AMQDestination _destination; /** * When true indicates that a blocking receive call is in progress @@ -72,23 +77,17 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final AtomicReference _messageListener = new AtomicReference(); - /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - protected int _consumerTag; + private int _consumerTag; - /** We need to know the channel id when constructing frames */ - protected final int _channelId; + private final int _channelId; - /** - * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors - *

Argument true indicates we want strict FIFO semantics - */ - protected final BlockingQueue _synchronousQueue; + private final BlockingQueue _synchronousQueue; - protected final MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; - protected final AMQSession _session; + private final AMQSession _session; - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required @@ -107,17 +106,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final int _prefetchLow; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ - protected boolean _exclusive; + private boolean _exclusive; - /** - * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per - * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our - * implementation. - */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. @@ -208,6 +199,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); + } _arguments = ft; @@ -232,6 +227,11 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _messageListener.get(); } + /** + * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per + * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our + * implementation. + */ public int getAcknowledgeMode() { return _acknowledgeMode; @@ -279,7 +279,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } - _logger.debug("Message listener set for destination " + _destination); + if (_logger.isDebugEnabled()) + { + _logger.debug("Message listener set for destination " + _destination); + } if (messageListener != null) { @@ -371,6 +374,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _noLocal; } + /** + * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover + */ public boolean isExclusive() { return _exclusive; @@ -537,7 +543,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } else if (o instanceof CloseConsumerMessage) { - _closed.set(true); + setClosed(); deregisterConsumer(); return null; } @@ -554,14 +560,14 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public void close(boolean sendClose) throws JMSException { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing consumer:" + debugIdentity()); + _logger.debug("Closing consumer:" + debugIdentity()); } - if (!_closed.getAndSet(true)) + if (!setClosed()) { - _closing.set(true); + setClosing(true); if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -607,12 +613,8 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } else { - // FIXME: wow this is ugly - // //fixme this probably is not right - // if (!isNoConsume()) - { // done in BasicCancelOK Handler but not sending one so just deregister. - deregisterConsumer(); - } + // FIXME? + deregisterConsumer(); } // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive @@ -641,7 +643,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { // synchronized (_closed) { - _closed.set(true); + setClosed(); if (_logger.isDebugEnabled()) { @@ -818,7 +820,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { // synchronized (_closed) { - _closed.set(true); + setClosed(); if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -859,6 +861,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _session.deregisterConsumer(this); } + /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ public int getConsumerTag() { return _consumerTag; @@ -1002,10 +1005,44 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public void failedOverPre() { clearReceiveQueue(); - // TGM FIXME: think this should just be removed - // clearUnackedMessages(); } public void failedOverPost() {} + /** The connection being used by this consumer */ + protected AMQConnection getConnection() + { + return _connection; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + /** We need to know the channel id when constructing frames */ + protected int getChannelId() + { + return _channelId; + } + + /** + * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors + *

Argument true indicates we want strict FIFO semantics + */ + protected BlockingQueue getSynchronousQueue() + { + return _synchronousQueue; + } + + protected MessageFactoryRegistry getMessageFactory() + { + return _messageFactory; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3b6179dd07..26bb51b821 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,21 +19,32 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AMQMessageDelegate_0_10; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + if (getSynchronousQueue().size() > 0) { RangeSet ranges = RangeSetFactory.createRangeSet(); - Iterator iterator = _synchronousQueue.iterator(); + Iterator iterator = getSynchronousQueue().iterator(); while (iterator.hasNext()) { @@ -486,7 +496,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer { - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQSession_0_8.DestinationCache _topicDestinationCache; private AMQSession_0_8.DestinationCache _queueDestinationCache; @@ -88,11 +95,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer 0 || !_disableTimestamps) + if (timeToLive > 0 || !isDisableTimestamps()) { currentTime = System.currentTimeMillis(); } @@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.setJMSExpiration(currentTime + timeToLive); } - if (!_disableTimestamps) + if (!isDisableTimestamps()) { deliveryProp.setTimestamp(currentTime); @@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer // if true, we need to sync the delivery of this message boolean sync = false; - sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || - (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) || + (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT && deliveryMode == DeliveryMode.PERSISTENT) ); @@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer @Override public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination); + return getSession().isQueueBound(destination); } @Override public void close() throws JMSException { super.close(); - AMQDestination dest = _destination; + AMQDestination dest = getAMQDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { if (dest.getDelete() == AddressOption.ALWAYS || @@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); + getAMQDestination().getQueueName()); } catch(TransportException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index b2f998cb2c..21ff6c877a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.client; -import java.util.UUID; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Topic; -import javax.jms.Queue; - -import java.nio.ByteBuffer; - import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -42,13 +33,24 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; +import java.nio.ByteBuffer; +import java.util.UUID; + public class BasicMessageProducer_0_8 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) @@ -56,7 +58,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer final MethodRegistry methodRegistry = getSession().getMethodRegistry(); ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(_session.getTicket(), + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), @@ -68,29 +70,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = body.generateFrame(_channelId); + AMQFrame declare = body.generateFrame(getChannelId()); - _protocolHandler.writeFrame(declare); + getProtocolHandler().writeFrame(declare); } void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); - AMQFrame publishFrame = body.generateFrame(_channelId); + AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); - contentHeaderProperties.setUserId(_userID); + contentHeaderProperties.setUserId(getUserID()); //Set the JMS_QPID_DESTTYPE for 0-8/9 messages int type; @@ -110,7 +112,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer //Set JMS_QPID_DESTTYPE delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - if (!_disableTimestamps) + if (!isDisableTimestamps()) { final long currentTime = System.currentTimeMillis(); contentHeaderProperties.setTimestamp(currentTime); @@ -134,12 +136,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (payload != null) { - createContentBodies(payload, frames, 2, _channelId); + createContentBodies(payload, frames, 2, getChannelId()); } - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled()) { - _logger.debug("Sending content body frames to " + destination); + getLogger().debug("Sending content body frames to " + destination); } @@ -147,11 +149,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, + ContentHeaderBody.createAMQFrame(getChannelId(), classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) + if (getLogger().isDebugEnabled()) { - _logger.debug("Sending content header frame to " + destination); + getLogger().debug("Sending content header frame to " + destination); } frames[0] = publishFrame; @@ -160,7 +162,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer try { - _session.checkFlowControl(); + getSession().checkFlowControl(); } catch (InterruptedException e) { @@ -170,7 +172,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - _protocolHandler.writeFrame(compositeFrame); + getProtocolHandler().writeFrame(compositeFrame); } /** @@ -194,7 +196,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { @@ -224,7 +226,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java index e6771e122c..ba26bfc485 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import javax.jms.IllegalStateException; import javax.jms.JMSException; - import java.util.concurrent.atomic.AtomicBoolean; /** @@ -49,14 +48,14 @@ public abstract class Closeable * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this * flag would mean have a synchronized block in every method. */ - protected final AtomicBoolean _closed = new AtomicBoolean(false); + private final AtomicBoolean _closed = new AtomicBoolean(false); /** * Are we in the process of closing. We have this distinction so we can * still signal we are in the process of closing so other objects can tell * the difference and tidy up. */ - protected final AtomicBoolean _closing = new AtomicBoolean(false); + private final AtomicBoolean _closing = new AtomicBoolean(false); /** * Checks if this is closed, and raises a JMSException if it is. @@ -91,6 +90,15 @@ public abstract class Closeable return _closing.get(); } + protected boolean setClosed() + { + return _closed.getAndSet(true); + } + + protected void setClosing(boolean closing) + { + _closing.set(closing); + } /** * Closes this object. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index e81e754da2..b2d1072e2b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; + import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; -import org.apache.qpid.framing.AMQShortString; - public enum CustomJMSXProperty { JMS_AMQP_NULL, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java deleted file mode 100644 index 81a55006ed..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import java.util.Queue; - -public abstract class DispatcherCallback -{ - BasicMessageConsumer _consumer; - - public DispatcherCallback(BasicMessageConsumer mc) - { - _consumer = mc; - } - - abstract public void whilePaused(Queue reprocessQueue); - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java index 585d6db3fd..134159afe1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -22,8 +22,8 @@ package org.apache.qpid.client; public class MessageConsumerPair { - BasicMessageConsumer _consumer; - Object _item; + private BasicMessageConsumer _consumer; + private Object _item; public MessageConsumerPair(BasicMessageConsumer consumer, Object item) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 5cf767ac35..03574ceab3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.client; -import java.util.Enumeration; +import org.apache.qpid.common.QpidProperties; import javax.jms.ConnectionMetaData; import javax.jms.JMSException; - -import org.apache.qpid.common.QpidProperties; +import java.util.Enumeration; public class QpidConnectionMetaData implements ConnectionMetaData { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java index 7059588367..b778ee22d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -33,8 +33,8 @@ import javax.jms.QueueReceiver; */ public class QueueReceiverAdaptor implements QueueReceiver { - protected MessageConsumer _consumer; - protected Queue _queue; + private MessageConsumer _consumer; + private Queue _queue; protected QueueReceiverAdaptor(Queue queue, MessageConsumer consumer) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 295c6a4091..0b797df9dd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -202,7 +202,7 @@ public class QueueSenderAdapter implements QueueSender { if (_delegate.getSession().isStrictAMQP()) { - _delegate._logger.warn("AMQP does not support destination validation before publish, "); + _delegate.getLogger().warn("AMQP does not support destination validation before publish, "); destination.setCheckedForQueueBinding(true); } else diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java index ca137f5a51..6da3825359 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -21,11 +21,11 @@ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.Destination; import javax.jms.JMSException; -import org.apache.qpid.framing.AMQShortString; - /** * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue * so that operations related to their "temporary-ness" can be abstracted out. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 509aa25bd5..d9514338ce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -17,10 +17,16 @@ */ package org.apache.qpid.client; -import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.AMQException; +import org.apache.qpid.jms.ConnectionURL; -import javax.jms.*; +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicConnection; +import javax.jms.XATopicSession; /** * This class implements the javax.njms.XAConnection interface @@ -47,7 +53,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(); + return getDelegate().createXASession(); } //-- Interface XAQueueConnection @@ -80,6 +86,6 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public XASession createXASession(int ackMode) throws JMSException { checkNotClosed(); - return _delegate.createXASession(ackMode); + return getDelegate().createXASession(ackMode); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 5b94b342eb..af9048f1f5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -17,9 +17,8 @@ */ package org.apache.qpid.client; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.transport.DtxXaStatus; @@ -29,8 +28,10 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.RecoverResult; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.XaResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; /** * This is an implementation of javax.njms.XAResource. @@ -307,13 +308,16 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr( e.getException().getErrorCode()); } - Xid[] result = new Xid[res.getInDoubt().size()]; - int i = 0; - for (Object obj : res.getInDoubt()) + Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0]; + if(result.length != 0) { - org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; - result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); - i++; + int i = 0; + for (Object obj : res.getInDoubt()) + { + org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; + result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); + i++; + } } return result; } @@ -435,6 +439,16 @@ public class XAResourceImpl implements XAResource } } + /** + * Is this resource currently enlisted in a transaction? + * + * @return true if the resource is associated with a transaction, false otherwise. + */ + public boolean isEnlisted() + { + return (_xid != null) ; + } + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index aaabf613fc..6d5bf9ad67 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -18,8 +18,16 @@ package org.apache.qpid.client; import org.apache.qpid.client.message.MessageFactoryRegistry; - -import javax.jms.*; +import org.apache.qpid.transport.RangeSet; + +import javax.jms.JMSException; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicSession; +import javax.jms.TransactionInProgressException; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicSession; import javax.transaction.xa.XAResource; /** @@ -79,7 +87,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = _qpidConnection.createSession(0); + _qpidDtxSession = getQpidConnection().createSession(0); _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); } @@ -171,4 +179,17 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { return (TopicSession) getSession(); } + + @Override + protected void acknowledgeImpl() + { + if (_xaResource.isEnlisted()) + { + acknowledgeMessage(Long.MAX_VALUE, true) ; + } + else + { + super.acknowledgeImpl() ; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index f74dbba939..4099da18d2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.failover; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.client.state.AMQStateManager; import java.util.concurrent.CountDownLatch; @@ -231,14 +231,7 @@ public class FailoverHandler implements Runnable { _logger.info("Failover process failed - exception being propagated by protocol handler"); _amqProtocolHandler.setFailoverState(FailoverState.FAILED); - /*try - {*/ _amqProtocolHandler.exception(e); - /*} - catch (Exception ex) - { - _logger.error("Error notifying protocol session of error: " + ex, ex); - }*/ } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index 51cc94965a..a69e808880 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -38,10 +38,10 @@ import org.apache.qpid.client.AMQConnection; public class FailoverNoopSupport implements FailoverSupport { /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; + private FailoverProtectedOperation operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index 28d19ce817..d3d33d3c75 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.client.failover; -import org.apache.qpid.client.AMQConnection; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; + /** * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due @@ -73,10 +73,10 @@ public class FailoverRetrySupport implements FailoverSup private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; + private FailoverProtectedOperation operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/filter/JMSSelectorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/filter/JMSSelectorFilter.java index 14cce0aa59..bab518b0ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/filter/JMSSelectorFilter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/filter/JMSSelectorFilter.java @@ -55,6 +55,10 @@ public class JMSSelectorFilter implements MessageFilter { _matcher = new SelectorParser().parse(selector); } + catch (ParseException e) + { + throw new AMQInternalException("Unable to parse selector \""+selector+"\"", e); + } catch (SelectorParsingException e) { throw new AMQInternalException("Unable to parse selector \""+selector+"\"", e); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index af47673a43..3a3ddae52f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -24,10 +24,10 @@ package org.apache.qpid.client.handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.framing.*; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AccessRequestOkBody; public class AccessRequestOkMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index 5cb9412d51..9b5eea3bba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BasicCancelOkMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(BasicCancelOkMethodHandler.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 33ca584b34..3f57e180e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BasicDeliverMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 3bbc9209c5..a09d298ae8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BasicReturnMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(BasicReturnMethodHandler.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index b9d4d6fa95..85328d78ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidRoutingKeyException; @@ -32,8 +35,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ChannelCloseMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 72936779c2..058484c339 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ChannelCloseOkMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java index 2153b9cc8c..919c5f6d67 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -1,12 +1,13 @@ package org.apache.qpid.client.handler; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ChannelFlowBody; + /* * * Licensed to the Apache Software Foundation (ASF) under one diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java index 6f66a972d5..c15404ea08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowOkBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ChannelFlowOkMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowOkMethodHandler.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index ec98783a8a..e1a0e18262 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.handler; -import java.util.Map; -import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.qpid.framing.*; import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQMethodNotImplementedException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.framing.*; + +import java.util.HashMap; +import java.util.Map; public class ClientMethodDispatcherImpl implements MethodDispatcher { @@ -94,16 +95,16 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("New Method Dispatcher:" + session); + _logger.debug("New Method Dispatcher:" + session); } DispatcherFactory factory = _dispatcherFactories.get(version); return factory.createMethodDispatcher(session); } - AMQProtocolSession _session; + private AMQProtocolSession _session; public ClientMethodDispatcherImpl(AMQProtocolSession session) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index d3e9fba8ed..f4fc3a4715 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.client.handler; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; - import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQMethodNotImplementedException; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java index f15340ae00..5f33561a8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.client.handler; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; - import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQMethodNotImplementedException; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91 { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index 19f758817d..28ad6037d4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -20,11 +20,19 @@ */ package org.apache.qpid.client.handler; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; - import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.TestContentBody; +import org.apache.qpid.framing.TestContentOkBody; +import org.apache.qpid.framing.TestIntegerBody; +import org.apache.qpid.framing.TestIntegerOkBody; +import org.apache.qpid.framing.TestStringBody; +import org.apache.qpid.framing.TestStringOkBody; +import org.apache.qpid.framing.TestTableBody; +import org.apache.qpid.framing.TestTableOkBody; +import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index b392604822..2cf7b089eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -20,21 +20,20 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ConnectionCloseMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ConnectionCloseMethodHandler.class); @@ -55,9 +54,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index 472c471fd6..0ccb9b72b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ConnectionRedirectMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ConnectionRedirectMethodHandler.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index 9a9bee757b..8afb6ffcb4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.client.handler; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + public class ConnectionSecureMethodHandler implements StateAwareMethodListener { private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 939bd181a3..66c4821f60 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -20,12 +20,8 @@ */ package org.apache.qpid.client.handler; -import java.io.UnsupportedEncodingException; -import java.util.StringTokenizer; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; @@ -33,16 +29,21 @@ import org.apache.qpid.client.security.AMQCallbackHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.properties.ConnectionStartProperties; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.UnsupportedEncodingException; +import java.util.StringTokenizer; public class ConnectionStartMethodHandler implements StateAwareMethodListener { @@ -148,14 +149,18 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 690d782b40..b60127cf93 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author Apache Software Foundation */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 01d82c9b55..3c76a8ac38 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.QueueDeleteOkBody; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author Apache Software Foundation */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java index a9434edf49..d01c4ac33d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -25,7 +25,6 @@ import org.apache.qpid.client.AMQSession; import javax.jms.Destination; import javax.jms.JMSException; - import java.util.Enumeration; import java.util.UUID; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 179ebd66d1..a0c3914127 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,22 +21,8 @@ package org.apache.qpid.client.message; -import java.lang.ref.SoftReference; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; @@ -54,8 +40,22 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; /** * This extends AbstractAMQMessageDelegate which contains common code between @@ -65,7 +65,22 @@ import org.slf4j.LoggerFactory; public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class); - private static final Map> _destinationCache = Collections.synchronizedMap(new HashMap>()); + + private static final float DESTINATION_CACHE_LOAD_FACTOR = 0.75f; + private static final int DESTINATION_CACHE_SIZE = 500; + private static final int DESTINATION_CACHE_CAPACITY = (int) (DESTINATION_CACHE_SIZE / DESTINATION_CACHE_LOAD_FACTOR); + + private static final Map _destinationCache = + Collections.synchronizedMap(new LinkedHashMap(DESTINATION_CACHE_CAPACITY, + DESTINATION_CACHE_LOAD_FACTOR, + true) + { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) + { + return size() >= DESTINATION_CACHE_SIZE; + } + }); public static final String JMS_TYPE = "x-jms-type"; @@ -241,12 +256,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - Destination dest = null; - SoftReference ref = _destinationCache.get(replyTo); - if (ref != null) - { - dest = ref.get(); - } + Destination dest = _destinationCache.get(replyTo); + if (dest == null) { String exchange = replyTo.getExchange(); @@ -254,14 +265,13 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) { - dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); } else { dest = convertToAddressBasedDestination(exchange,routingKey,null); } - _destinationCache.put(replyTo, new SoftReference(dest)); + _destinationCache.put(replyTo, dest); } return dest; @@ -271,6 +281,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject) { String addr; + boolean isQueue = true; if ("".equals(exchange)) // type Queue { subject = (subject == null) ? "" : "/" + subject; @@ -279,11 +290,24 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate else { addr = exchange + "/" + routingKey; + isQueue = false; } try { - return AMQDestination.createDestination("ADDR:" + addr); + AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr); + if (isQueue) + { + dest.setQueueName(new AMQShortString(routingKey)); + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString("")); + } + else + { + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString(exchange)); + } + return dest; } catch(Exception e) { @@ -341,13 +365,11 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate jmse.setLinkedException(e); throw jmse; } - } - + final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); - _destinationCache.put(replyTo, new SoftReference(destination)); + _destinationCache.put(replyTo, destination); _messageProps.setReplyTo(replyTo); - } public Destination getJMSDestination() throws JMSException @@ -560,6 +582,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { return ((Short)o).shortValue(); } + else if(o instanceof String) + { + return Short.valueOf((String) o); + } else { try @@ -587,6 +613,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { return ((Integer)o).intValue(); } + else if(o instanceof String) + { + return Integer.valueOf((String) o); + } else { try @@ -613,6 +643,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { return ((Long)o).longValue(); } + else if(o instanceof String) + { + return Long.valueOf((String) o); + } else { try @@ -933,7 +967,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate // apply when a property is used in a message selector expression. For // example, suppose you set a property as a string value, as in the // following: -// myMessage.setStringProperty("NumberOfOrders", "2"); +// myMessage.setStringProperty("NumberOfOrders", "2") // The following expression in a message selector would evaluate to false, // because a string cannot be used in an arithmetic expression: // "NumberOfOrders > 1" diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index ab7061c382..fe9f9f4d00 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -21,27 +21,47 @@ package org.apache.qpid.client.message; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; import java.net.URISyntaxException; import java.util.Collections; import java.util.Enumeration; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; - -import org.apache.qpid.client.*; -import org.apache.qpid.collections.ReferenceMap; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.BindingURL; public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { - private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + private static final float DESTINATION_CACHE_LOAD_FACTOR = 0.75f; + private static final int DESTINATION_CACHE_SIZE = 500; + private static final int DESTINATION_CACHE_CAPACITY = (int) (DESTINATION_CACHE_SIZE / DESTINATION_CACHE_LOAD_FACTOR); + + private static final Map _destinationCache = + Collections.synchronizedMap(new LinkedHashMap(DESTINATION_CACHE_CAPACITY, + DESTINATION_CACHE_LOAD_FACTOR, + true) + { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) + { + return size() >= DESTINATION_CACHE_SIZE; + } + }); public static final String JMS_TYPE = "x-jms-type"; @@ -229,7 +249,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } else { - Destination dest = (Destination) _destinationCache.get(replyToEncoding); + Destination dest = _destinationCache.get(replyToEncoding); if (dest == null) { try @@ -266,7 +286,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate final AMQDestination amqd = (AMQDestination) destination; final AMQShortString encodedDestination = amqd.getEncodedName(); - _destinationCache.put(encodedDestination, destination); + _destinationCache.put(encodedDestination.asString(), destination); getContentHeaderProperties().setReplyTo(encodedDestination); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index be71c8c657..11d99f5446 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -21,18 +21,17 @@ package org.apache.qpid.client.message; */ -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.nio.ByteBuffer; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - import org.apache.qpid.AMQException; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.BBEncoder; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; + public class AMQPEncodedMapMessage extends JMSMapMessage { public static final String MIME_TYPE = "amqp/map"; @@ -68,7 +67,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value instanceof List) || (value instanceof Map) || (value instanceof UUID) || (value == null)) { - _map.put(propName, value); + getMap().put(propName, value); } else { @@ -82,7 +81,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage public ByteBuffer getData() { BBEncoder encoder = new BBEncoder(1024); - encoder.writeMap(_map); + encoder.writeMap(getMap()); return encoder.segment(); } @@ -94,22 +93,18 @@ public class AMQPEncodedMapMessage extends JMSMapMessage data.rewind(); BBDecoder decoder = new BBDecoder(); decoder.init(data); - _map = decoder.readMap(); + setMap(decoder.readMap()); } else { - _map.clear(); + getMap().clear(); } } // for testing public Map getMap() { - return _map; - } - - void setMap(Map map) - { - _map = map; + return super.getMap(); } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java index 2c38f153cb..f997862bb0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java @@ -21,12 +21,11 @@ package org.apache.qpid.client.message; */ -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import javax.jms.JMSException; import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; - public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 1b6c0c751d..1395f39b99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.client.message; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.jms.JMSException; -import javax.jms.Session; - import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -34,6 +28,11 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import javax.jms.JMSException; +import javax.jms.Session; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * This abstract class provides exchange lookup functionality that is shared * between all MessageDelegates. Update facilities are provided so that the 0-10 @@ -122,18 +121,18 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate exchangeInfo = new ExchangeInfo(exchange.asString(),"",AMQDestination.UNKNOWN_TYPE); } - if ("topic".equals(exchangeInfo.exchangeType)) + if ("topic".equals(exchangeInfo.getExchangeType())) { dest = new AMQTopic(exchange, routingKey, null); } - else if ("direct".equals(exchangeInfo.exchangeType)) + else if ("direct".equals(exchangeInfo.getExchangeType())) { dest = new AMQQueue(exchange, routingKey, routingKey); } else { dest = new AMQAnyDestination(exchange, - new AMQShortString(exchangeInfo.exchangeType), + new AMQShortString(exchangeInfo.getExchangeType()), routingKey, false, false, @@ -224,9 +223,9 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate class ExchangeInfo { - String exchangeName; - String exchangeType; - int destType = AMQDestination.QUEUE_TYPE; + private String exchangeName; + private String exchangeType; + private int destType = AMQDestination.QUEUE_TYPE; public ExchangeInfo(String exchangeName, String exchangeType, int destType) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index ddeb62fbf6..9c7bd0bdcf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -21,26 +21,20 @@ package org.apache.qpid.client.message; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.transport.util.Functions; import javax.jms.JMSException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.transport.util.Functions; +import java.nio.ByteBuffer; /** * @author Apache Software Foundation */ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { - protected boolean _readableMessage = false; + private boolean _readableMessage = false; AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { @@ -81,6 +75,11 @@ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage _readableMessage = false; } + protected void setReadable(boolean readable) + { + _readableMessage = readable; + } + public String toBodyString() throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index f713554bfb..d1e43447cc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.client.message; -import java.nio.ByteBuffer; -import java.util.Enumeration; -import java.util.UUID; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession; +import java.nio.ByteBuffer; +import java.util.Enumeration; +import java.util.UUID; public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message { @@ -37,7 +36,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - protected AMQMessageDelegate _delegate; + private AMQMessageDelegate _delegate; private boolean _redelivered; private boolean _receivedFromServer; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 16b71db77e..608567674a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,27 +20,25 @@ */ package org.apache.qpid.client.message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.DeliveryProperties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.MessageProperties; import javax.jms.JMSException; - +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; -import java.nio.ByteBuffer; - public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); @@ -59,25 +57,25 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { if (debug) { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload); + data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); } else if (bodies != null) { if (debug) { _logger.debug("Fragmented message body (" + bodies - .size() + " frames, bodySize=" + contentHeader.bodySize + ")"); + .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? + data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? final Iterator it = bodies.iterator(); while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = ByteBuffer.wrap(cb._payload); + final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); if(payload.isDirect() || payload.isReadOnly()) { data.put(payload); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java index 49ae8c14b2..31a0440b04 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java @@ -20,35 +20,39 @@ package org.apache.qpid.client.message; * */ -import java.util.HashMap; -import java.util.Map; - import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import java.util.HashMap; +import java.util.Map; + public class FieldTableSupport { - public static FieldTable convertToFieldTable(Map props) - { - FieldTable ft = new FieldTable(); - if (props != null) - { - for (String key : props.keySet()) - { - ft.setObject(key, props.get(key)); - } - } - return ft; - } + private FieldTableSupport() + { + } + + public static FieldTable convertToFieldTable(Map props) + { + FieldTable ft = new FieldTable(); + if (props != null) + { + for (String key : props.keySet()) + { + ft.setObject(key, props.get(key)); + } + } + return ft; + } - public static Map convertToMap(FieldTable ft) - { - Map map = new HashMap(); - for (AMQShortString key: ft.keySet() ) - { - map.put(key.asString(), ft.getObject(key)); - } + public static Map convertToMap(FieldTable ft) + { + Map map = new HashMap(); + for (AMQShortString key: ft.keySet() ) + { + map.put(key.asString(), ft.getObject(key)); + } - return map; - } + return map; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index e252bdb719..b0320d0f4e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,21 +20,13 @@ */ package org.apache.qpid.client.message; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; +import org.apache.qpid.AMQException; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import java.nio.ByteBuffer; public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { @@ -60,7 +52,7 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index 89561b88eb..c8c01f16ee 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -20,12 +20,9 @@ */ package org.apache.qpid.client.message; -import javax.jms.JMSException; - import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import javax.jms.JMSException; import java.nio.ByteBuffer; public class JMSBytesMessageFactory extends AbstractJMSMessageFactory diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 52c0eb263b..122a5c4ef2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.client.message; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Enumeration; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import javax.jms.JMSException; import javax.jms.MessageFormatException; - +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; +import java.util.Enumeration; public final class JMSHeaderAdapter @@ -60,7 +59,7 @@ public final class JMSHeaderAdapter { Object str = getHeaders().getObject(string); - if (str == null || !(str instanceof String)) + if (!(str instanceof String)) { throw new MessageFormatException("getBoolean can't use " + string + " item."); } @@ -89,7 +88,7 @@ public final class JMSHeaderAdapter { Object str = getHeaders().getObject(string); - if (str == null || !(str instanceof String)) + if (!(str instanceof String)) { throw new MessageFormatException("getBoolean can't use " + string + " item."); } @@ -160,7 +159,7 @@ public final class JMSHeaderAdapter { Object str = getHeaders().getObject(string); - if (str == null || !(str instanceof String)) + if (!(str instanceof String)) { throw new MessageFormatException("getByte can't use " + string + " item."); } @@ -228,7 +227,7 @@ public final class JMSHeaderAdapter { Object str = getHeaders().getObject(string); - if (str == null || !(str instanceof String)) + if (!(str instanceof String)) { throw new MessageFormatException("getFloat can't use " + string + " item."); } @@ -285,7 +284,7 @@ public final class JMSHeaderAdapter s = String.valueOf(o); } } - }//else return s // null; + } } return s; @@ -527,7 +526,7 @@ public final class JMSHeaderAdapter // apply when a property is used in a message selector expression. For // example, suppose you set a property as a string value, as in the // following: -// myMessage.setStringProperty("NumberOfOrders", "2"); +// myMessage.setStringProperty("NumberOfOrders", "2") // The following expression in a message selector would evaluate to false, // because a string cannot be used in an arithmetic expression: // "NumberOfOrders > 1" diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index fad24a968e..e18ed80f6d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,17 +20,14 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQException; + import javax.jms.JMSException; import javax.jms.MessageFormatException; - import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -42,7 +39,7 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe public static final String MIME_TYPE = "jms/map-message"; - protected Map _map = new HashMap(); + private Map _map = new HashMap(); public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { @@ -486,4 +483,13 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe return writer.getData(); } + protected Map getMap() + { + return _map; + } + + protected void setMap(Map map) + { + _map = map; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index 89408a5c3c..72bc8d59ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.client.message; -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import javax.jms.JMSException; import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; public class JMSMapMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 7f733b9644..509fc9f7f1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.client.message; -import java.io.*; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; +import org.apache.qpid.util.ByteBufferInputStream; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; -import org.apache.qpid.util.ByteBufferInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index 4660c91c1f..a1727811ae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.client.message; -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import javax.jms.JMSException; import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 5c93f6b6f0..b958d89515 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,15 +20,12 @@ */ package org.apache.qpid.client.message; +import org.apache.qpid.AMQException; + import javax.jms.JMSException; import javax.jms.StreamMessage; - import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; - /** * @author Apache Software Foundation */ @@ -57,7 +54,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index 359f5157f3..56fa8e5e38 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.client.message; -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import javax.jms.JMSException; import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; - public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index acf3a0ca14..097a3bb5c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.client.message; -import java.io.DataInputStream; -import java.io.UnsupportedEncodingException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.CustomJMSXProperty; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; @@ -29,16 +32,6 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.util.Strings; - public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { private static final String MIME_TYPE = "text/plain"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index d1af32c10a..0b28e6ca85 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.client.message; -import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import javax.jms.JMSException; import java.nio.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSTextMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java index e606ef11c9..5abd02f150 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java @@ -22,10 +22,17 @@ package org.apache.qpid.client.message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQSession; -import javax.jms.*; +import org.apache.qpid.client.AMQSession; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; import java.util.Enumeration; public class MessageConverter @@ -34,7 +41,7 @@ public class MessageConverter /** * Log4J logger */ - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); /** * AbstractJMSMessage which will hold the converted message diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 93c2872b2e..70c6aa4c75 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.client.message; -import java.util.List; - -import javax.jms.JMSException; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; @@ -33,6 +29,9 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; +import javax.jms.JMSException; +import java.util.List; + public interface MessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 15ad3ed89f..fa39b4c93c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -20,12 +20,8 @@ */ package org.apache.qpid.client.message; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.nio.ByteBuffer; - -import javax.jms.JMSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQQueue; @@ -34,19 +30,21 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.DeliveryProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class MessageFactoryRegistry { /** * This class logger */ - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private final Map _mimeStringToFactoryMap = new HashMap(); private final Map _mimeShortStringToFactoryMap = diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java index b30afafa35..663dfd39b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java @@ -25,6 +25,9 @@ package org.apache.qpid.client.message; */ public class QpidMessageProperties { + private QpidMessageProperties() + { + } public static final String QPID_SUBJECT = "qpid.subject"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java index 1ae25eb1ed..b00ac7e34b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java @@ -455,7 +455,7 @@ class TypedBytesContentReader implements TypedBytesCodes ByteBuffer dup = _data.duplicate(); int pos = _data.position(); byte b; - while((b = _data.get()) != 0); + while((b = _data.get()) != 0) {}; dup.limit(_data.position()-1); return _charsetDecoder.decode(dup).toString(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index e2cb36a030..bb0e3b447f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -21,7 +21,6 @@ package org.apache.qpid.client.message; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.BasicMessageConsumer; /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index ce87a112c9..5ed98dc8ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.client.message; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and * the content body/ies. @@ -44,7 +44,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage private AMQShortString _exchange; private AMQShortString _routingKey; private final long _deliveryId; - protected boolean _redelivered; + private boolean _redelivered; private BasicDeliverBody _deliverBody; private ContentHeaderBody _contentHeader; @@ -87,13 +87,13 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public void receiveBody(ContentBody body) { - if (body._payload != null) + if (body.getPayload() != null) { - final long payloadSize = body._payload.length; + final long payloadSize = body.getPayload().length; if (_bodies == null) { - if (payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().getBodySize()) { _bodies = Collections.singletonList(body); } @@ -124,7 +124,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public boolean isAllBodyDataReceived() { - return _bytesReceived == getContentHeader().bodySize; + return _bytesReceived == getContentHeader().getBodySize(); } public BasicDeliverBody getDeliverBody() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 368ec60525..318fe32d36 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -20,22 +20,21 @@ */ package org.apache.qpid.client.messaging.address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; -import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * Utility class for extracting information from the address class */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index c73d800b14..41f6725c8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -29,16 +29,16 @@ public class Link public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } - protected String name; - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected boolean _isDurable; - protected int _consumerCapacity = 0; - protected int _producerCapacity = 0; - protected Node node; - protected Subscription subscription; - protected Reliability reliability = Reliability.AT_LEAST_ONCE; + private String name; + private String _filter; + private FilterType _filterType = FilterType.SUBJECT; + private boolean _isNoLocal; + private boolean _isDurable; + private int _consumerCapacity = 0; + private int _producerCapacity = 0; + private Node node; + private Subscription subscription; + private Reliability reliability = Reliability.AT_LEAST_ONCE; public Reliability getReliability() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index c98b194334..0da0327885 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -21,25 +21,28 @@ package org.apache.qpid.client.messaging.address; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.Binding; + import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import javax.naming.OperationNotSupportedException; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQDestination.Binding; - public abstract class Node { - protected int _nodeType = AMQDestination.UNKNOWN_TYPE; - protected boolean _isDurable; - protected boolean _isAutoDelete; - protected String _alternateExchange; - protected List _bindings = new ArrayList(); - protected Map _declareArgs = Collections.emptyMap(); - + private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private boolean _isDurable; + private boolean _isAutoDelete; + private String _alternateExchange; + private List _bindings = new ArrayList(); + private Map _declareArgs = Collections.emptyMap(); + + protected Node(int nodeType) + { + _nodeType = nodeType; + } + public int getType() { return _nodeType; @@ -101,12 +104,12 @@ public abstract class Node public static class QueueNode extends Node { - protected boolean _isExclusive; - protected QpidQueueOptions _queueOptions = new QpidQueueOptions(); + private boolean _isExclusive; + private QpidQueueOptions _queueOptions = new QpidQueueOptions(); public QueueNode() { - _nodeType = AMQDestination.QUEUE_TYPE; + super(AMQDestination.QUEUE_TYPE); } public boolean isExclusive() @@ -122,12 +125,12 @@ public abstract class Node public static class ExchangeNode extends Node { - protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - protected String _exchangeType; + private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); + private String _exchangeType; public ExchangeNode() { - _nodeType = AMQDestination.TOPIC_TYPE; + super(AMQDestination.TOPIC_TYPE); } public String getExchangeType() @@ -144,5 +147,9 @@ public abstract class Node public static class UnknownNodeType extends Node { + public UnknownNodeType() + { + super(AMQDestination.UNKNOWN_TYPE); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8911d4ee3e..d380402da7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.client.protocol; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.qpid.util.BytesDataOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -66,8 +57,16 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -300,7 +299,6 @@ public class AMQProtocolHandler implements ProtocolEngine { if (_failoverState == FailoverState.NOT_STARTED) { - // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -314,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // FIXME Need to correctly handle other exceptions. Things like ... - // if (cause instanceof AMQChannelClosedException) + // AMQChannelClosedException // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -865,160 +863,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _suggestedProtocolVersion; } - private static class BytesDataOutput implements DataOutput - { - int _pos = 0; - byte[] _buf; - - public BytesDataOutput(byte[] buf) - { - _buf = buf; - } - - public void setBuffer(byte[] buf) - { - _buf = buf; - _pos = 0; - } - - public void reset() - { - _pos = 0; - } - - public int length() - { - return _pos; - } - - public void write(int b) - { - _buf[_pos++] = (byte) b; - } - - public void write(byte[] b) - { - System.arraycopy(b, 0, _buf, _pos, b.length); - _pos+=b.length; - } - - - public void write(byte[] b, int off, int len) - { - System.arraycopy(b, off, _buf, _pos, len); - _pos+=len; - - } - - public void writeBoolean(boolean v) - { - _buf[_pos++] = v ? (byte) 1 : (byte) 0; - } - - public void writeByte(int v) - { - _buf[_pos++] = (byte) v; - } - - public void writeShort(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeChar(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeInt(int v) - { - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - - public void writeLong(long v) - { - _buf[_pos++] = (byte) (v >>> 56); - _buf[_pos++] = (byte) (v >>> 48); - _buf[_pos++] = (byte) (v >>> 40); - _buf[_pos++] = (byte) (v >>> 32); - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte)v; - } - - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } - - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - - public void writeBytes(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - _buf[_pos++] = ((byte)s.charAt(i)); - } - } - - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - int v = s.charAt(i); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _pos; - _pos+=2; - - - for (int i = 0; i < strlen; i++) - { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buf[_pos++] = (byte) c; - - } - else if (c > 0x07FF) - { - _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - else - { - _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - } - - int len = _pos - (pos + 2); - - _buf[pos++] = (byte) (len >>> 8); - _buf[pos] = (byte) len; - } - - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b7253e6e9c..af57fd98fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,8 @@ */ package org.apache.qpid.client.protocol; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; @@ -48,8 +45,11 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Wrapper for protocol session that provides type-safe access to session attributes.

The underlying protocol @@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - /** - * The handler from which this session was created and which is used to handle protocol events. We send failover - * events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives @@ -91,20 +86,17 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; - /** Counter to ensure unique queue names */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); + private int _queueId = 1; + private final Object _queueIdLock = new Object(); private ProtocolVersion _protocolVersion; -// private VersionSpecificRegistry _registry = -// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); private MethodDispatcher _methodDispatcher; - protected final AMQConnection _connection; + private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; @@ -116,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); - _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion); + } _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; @@ -223,7 +218,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } msg.setContentHeader(contentHeader); - if (contentHeader.bodySize == 0) + if (contentHeader.getBodySize() == 0) { deliverMessageToAMQSession(channelId, msg); } @@ -310,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void closeSession(AMQSession session) { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + } final int channelId = session.getChannelId(); if (channelId <= 0) { @@ -401,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void setProtocolVersion(final ProtocolVersion pv) { - _logger.info("Setting ProtocolVersion to :" + pv); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting ProtocolVersion to :" + pv); + } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); @@ -470,4 +471,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return "AMQProtocolSession[" + _connection + ']'; } + + /** + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. + */ + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + /** Maps from the channel id to the AMQSession that it represents. */ + protected ConcurrentMap getChannelId2SessionMap() + { + return _channelId2SessionMap; + } + + protected void setChannelId2SessionMap(ConcurrentMap channelId2SessionMap) + { + _channelId2SessionMap = channelId2SessionMap; + } + + protected ConcurrentMap getClosingChannels() + { + return _closingChannels; + } + + protected void setClosingChannels(ConcurrentMap closingChannels) + { + _closingChannels = closingChannels; + } + + /** Counter to ensure unique queue names */ + protected int getQueueId() + { + return _queueId; + } + + protected void setQueueId(int queueId) + { + _queueId = queueId; + } + + protected Object getQueueIdLock() + { + return _queueIdLock; + } + + protected AMQConnection getConnection() + { + return _connection; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 2bc609ebf2..b865c51cb7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -20,12 +20,7 @@ */ package org.apache.qpid.client.protocol; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.util.BlockingWaiter; import org.apache.qpid.framing.AMQMethodBody; @@ -68,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter _waiters = new CopyOnWriteArrayList(); + private final List _waiters = new CopyOnWriteArrayList(); private Exception _lastException; public AMQStateManager() @@ -110,7 +111,6 @@ public class AMQStateManager implements AMQMethodListener { B method = evt.getMethod(); - // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId()); return true; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index 17d04f4fa3..6b038e09c8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -21,9 +21,8 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.framing.AMQMethodBody; /** * A frame listener that is informed of the protocl state when invoked and has diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 732480e1c9..c8903d252f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.client.state; -import org.apache.qpid.client.util.BlockingWaiter; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.AMQException; -import org.slf4j.LoggerFactory; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.util.BlockingWaiter; import java.util.Set; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java deleted file mode 100644 index 6e47e2ce28..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import org.apache.qpid.jms.BrokerDetails; - -/** - * AMQNoTransportForProtocolException represents a connection failure where there is no transport medium to connect - * to the broker available. This may be the case if their is a error in the connection url, or an unsupported transport - * type is specified. - * - *

CRC Card
- *
CRC Card
Responsibilities Collaborations - *
Represent absence of a transport medium. - *
- * - * @todo Error code never used. This is not an AMQException. - */ -public class AMQNoTransportForProtocolException extends AMQTransportConnectionException -{ - BrokerDetails _details; - - public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause) - { - super(null, message, cause); - - _details = details; - } - - public String toString() - { - if (_details != null) - { - return super.toString() + _details.toString(); - } - else - { - return super.toString(); - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java deleted file mode 100644 index 6bef6216bd..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; - -/** - * AMQTransportConnectionException indicates a failure to establish a connection through the transporting medium, to - * an AMQP broker. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represent failure to connect through the transport medium. - *
- * - * @todo Error code never used. This is not an AMQException. - */ -public class AMQTransportConnectionException extends AMQException -{ - public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause) - { - super(errorCode, message, cause); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index 1b483f6948..3c9a6e1500 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.client.transport; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; import org.apache.qpid.client.security.AMQCallbackHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; @@ -38,11 +36,13 @@ import org.apache.qpid.transport.ConnectionOpenOk; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.util.Strings; -import org.ietf.jgss.GSSContext; -import org.ietf.jgss.GSSException; -import org.ietf.jgss.GSSManager; -import org.ietf.jgss.GSSName; -import org.ietf.jgss.Oid; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @@ -85,7 +85,7 @@ public class ClientConnectionDelegate extends ClientDelegate protected SaslClient createSaslClient(List brokerMechs) throws ConnectionException, SaslException { final String brokerMechanisms = Strings.join(" ", brokerMechs); - final String restrictionList = _conSettings.getSaslMechs(); + final String restrictionList = getConnectionSettings().getSaslMechs(); final String selectedMech = CallbackHandlerRegistry.getInstance().selectMechanism(brokerMechanisms, restrictionList); if (selectedMech == null) { @@ -96,14 +96,14 @@ public class ClientConnectionDelegate extends ClientDelegate } Map saslProps = new HashMap(); - if (_conSettings.isUseSASLEncryption()) + if (getConnectionSettings().isUseSASLEncryption()) { saslProps.put(Sasl.QOP, "auth-conf"); } final AMQCallbackHandler handler = CallbackHandlerRegistry.getInstance().createCallbackHandler(selectedMech); handler.initialise(_connectionURL); - final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, _conSettings.getSaslProtocol(), _conSettings.getSaslServerName(), saslProps, handler); + final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, getConnectionSettings().getSaslProtocol(), getConnectionSettings().getSaslServerName(), saslProps, handler); return sc; } @@ -137,7 +137,7 @@ public class ClientConnectionDelegate extends ClientDelegate private String getKerberosUser() { LOGGER.debug("Obtaining userID from kerberos"); - String service = _conSettings.getSaslProtocol() + "@" + _conSettings.getSaslServerName(); + String service = getConnectionSettings().getSaslProtocol() + "@" + getConnectionSettings().getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java deleted file mode 100644 index 7a24d6e15a..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; - -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; - -public interface ITransportConnection -{ - void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) - throws IOException; -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index 03167561ef..f303d155c6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -21,10 +21,6 @@ package org.apache.qpid.client.url; */ -import java.net.URI; -import java.net.URISyntaxException; -import java.util.StringTokenizer; - import org.apache.qpid.client.AMQBrokerDetails; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; @@ -32,6 +28,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.URLHelper; import org.apache.qpid.url.URLSyntaxException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.StringTokenizer; + public class URLParser { private AMQConnectionURL _url; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java index 605e9ee154..d81868f924 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java @@ -17,13 +17,13 @@ */ package org.apache.qpid.client.url; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.jms.BrokerDetails; + import java.net.MalformedURLException; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.client.AMQBrokerDetails; -import org.apache.qpid.jms.BrokerDetails; - /** * The format Qpid URL is based on the AMQP one. * The grammar is as follows: @@ -57,7 +57,6 @@ public class URLParser_0_10 private static final char PROPERTY_SEPARATOR_CHAR = ';'; private static final char ADDRESS_SEPERATOR_CHAR = ','; - //private static final char CLIENT_ID_TRANSPORT_SEPARATOR_CHAR = ':'; private static final char TRANSPORT_HOST_SEPARATOR_CHAR = ':'; private static final char HOST_PORT_SEPARATOR_CHAR = ':'; private static final char AT_CHAR = '@'; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index bec41644fc..80d171592f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.util; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * BlockingWaiter is a 'rendezvous' which delegates handling of @@ -84,7 +85,7 @@ public abstract class BlockingWaiter private volatile Exception _error; /** Holds the incomming Object. */ - protected Object _doneObject = null; + private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; @@ -183,11 +184,7 @@ public abstract class BlockingWaiter { _logger.error(e.getMessage(), e); // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - // if (!_ready && timeout != -1) - // { - // _error = new AMQException("Server did not respond timely"); - // _ready = true; - // } + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index ee7fc533a3..c8d12142e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/KeyValue.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/KeyValue.java deleted file mode 100644 index e890aba968..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/KeyValue.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2003-2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.collections; - -/** - * Defines a simple key value pair. - *

- * A Map Entry has considerable additional semantics over and above a simple - * key-value pair. This interface defines the minimum key value, with just the - * two get methods. - * - * @since Commons Collections 3.0 - * @version $Revision$ $Date$ - * - * @author Stephen Colebourne - */ -public interface KeyValue { - - /** - * Gets the key from the pair. - * - * @return the key - */ - Object getKey(); - - /** - * Gets the value from the pair. - * - * @return the value - */ - Object getValue(); - -} \ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java deleted file mode 100644 index 1516c56e42..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java +++ /dev/null @@ -1,957 +0,0 @@ -/* - * Copyright 2001-2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.collections; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.SoftReference; -import java.lang.ref.WeakReference; -import java.util.AbstractCollection; -import java.util.AbstractMap; -import java.util.AbstractSet; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.qpid.collections.keyvalue.DefaultMapEntry; - -/** - * Hash-based {@link Map} implementation that allows - * mappings to be removed by the garbage collector.

- * - * When you construct a ReferenceMap, you can - * specify what kind of references are used to store the - * map's keys and values. If non-hard references are - * used, then the garbage collector can remove mappings - * if a key or value becomes unreachable, or if the - * JVM's memory is running low. For information on how - * the different reference types behave, see - * {@link Reference}.

- * - * Different types of references can be specified for keys - * and values. The keys can be configured to be weak but - * the values hard, in which case this class will behave - * like a - * WeakHashMap. However, you - * can also specify hard keys and weak values, or any other - * combination. The default constructor uses hard keys - * and soft values, providing a memory-sensitive cache.

- * - * The algorithms used are basically the same as those - * in {@link java.util.HashMap}. In particular, you - * can specify a load factor and capacity to suit your - * needs. All optional {@link Map} operations are - * supported.

- * - * However, this {@link Map} implementation does not - * allow null elements. Attempting to add a null key or - * or a null value to the map will raise a - * NullPointerException.

- * - * As usual, this implementation is not synchronized. You - * can use {@link java.util.Collections#synchronizedMap} to - * provide synchronized access to a ReferenceMap. - * - * @see java.lang.ref.Reference - * - * @deprecated Moved to map subpackage. Due to be removed in v4.0. - * @since Commons Collections 2.1 - * @version $Revision$ $Date$ - * - * @author Paul Jack - */ -public class ReferenceMap extends AbstractMap { - - /** - * For serialization. - */ - private static final long serialVersionUID = -3370601314380922368L; - - - /** - * Constant indicating that hard references should be used. - */ - final public static int HARD = 0; - - - /** - * Constant indicating that soft references should be used. - */ - final public static int SOFT = 1; - - - /** - * Constant indicating that weak references should be used. - */ - final public static int WEAK = 2; - - - // --- serialized instance variables: - - - /** - * The reference type for keys. Must be HARD, SOFT, WEAK. - * Note: I originally marked this field as final, but then this class - * didn't compile under JDK1.2.2. - * @serial - */ - private int keyType; - - - /** - * The reference type for values. Must be HARD, SOFT, WEAK. - * Note: I originally marked this field as final, but then this class - * didn't compile under JDK1.2.2. - * @serial - */ - private int valueType; - - - /** - * The threshold variable is calculated by multiplying - * table.length and loadFactor. - * Note: I originally marked this field as final, but then this class - * didn't compile under JDK1.2.2. - * @serial - */ - private float loadFactor; - - /** - * Should the value be automatically purged when the associated key has been collected? - */ - private boolean purgeValues = false; - - - // -- Non-serialized instance variables - - /** - * ReferenceQueue used to eliminate stale mappings. - * See purge. - */ - private transient ReferenceQueue queue = new ReferenceQueue(); - - - /** - * The hash table. Its length is always a power of two. - */ - private transient Entry[] table; - - - /** - * Number of mappings in this map. - */ - private transient int size; - - - /** - * When size reaches threshold, the map is resized. - * See resize(). - */ - private transient int threshold; - - - /** - * Number of times this map has been modified. - */ - private transient volatile int modCount; - - - /** - * Cached key set. May be null if key set is never accessed. - */ - private transient Set keySet; - - - /** - * Cached entry set. May be null if entry set is never accessed. - */ - private transient Set entrySet; - - - /** - * Cached values. May be null if values() is never accessed. - */ - private transient Collection values; - - - /** - * Constructs a new ReferenceMap that will - * use hard references to keys and soft references to values. - */ - public ReferenceMap() { - this(HARD, SOFT); - } - - /** - * Constructs a new ReferenceMap that will - * use the specified types of references. - * - * @param keyType the type of reference to use for keys; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param valueType the type of reference to use for values; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param purgeValues should the value be automatically purged when the - * key is garbage collected - */ - public ReferenceMap(int keyType, int valueType, boolean purgeValues) { - this(keyType, valueType); - this.purgeValues = purgeValues; - } - - /** - * Constructs a new ReferenceMap that will - * use the specified types of references. - * - * @param keyType the type of reference to use for keys; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param valueType the type of reference to use for values; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - */ - public ReferenceMap(int keyType, int valueType) { - this(keyType, valueType, 16, 0.75f); - } - - /** - * Constructs a new ReferenceMap with the - * specified reference types, load factor and initial - * capacity. - * - * @param keyType the type of reference to use for keys; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param valueType the type of reference to use for values; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param capacity the initial capacity for the map - * @param loadFactor the load factor for the map - * @param purgeValues should the value be automatically purged when the - * key is garbage collected - */ - public ReferenceMap( - int keyType, - int valueType, - int capacity, - float loadFactor, - boolean purgeValues) { - this(keyType, valueType, capacity, loadFactor); - this.purgeValues = purgeValues; - } - - /** - * Constructs a new ReferenceMap with the - * specified reference types, load factor and initial - * capacity. - * - * @param keyType the type of reference to use for keys; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param valueType the type of reference to use for values; - * must be {@link #HARD}, {@link #SOFT}, {@link #WEAK} - * @param capacity the initial capacity for the map - * @param loadFactor the load factor for the map - */ - public ReferenceMap(int keyType, int valueType, int capacity, float loadFactor) { - super(); - - verify("keyType", keyType); - verify("valueType", valueType); - - if (capacity <= 0) { - throw new IllegalArgumentException("capacity must be positive"); - } - if ((loadFactor <= 0.0f) || (loadFactor >= 1.0f)) { - throw new IllegalArgumentException("Load factor must be greater than 0 and less than 1."); - } - - this.keyType = keyType; - this.valueType = valueType; - - int v = 1; - while (v < capacity) v *= 2; - - this.table = new Entry[v]; - this.loadFactor = loadFactor; - this.threshold = (int)(v * loadFactor); - } - - - // used by constructor - private static void verify(String name, int type) { - if ((type < HARD) || (type > WEAK)) { - throw new IllegalArgumentException(name + - " must be HARD, SOFT, WEAK."); - } - } - - - /** - * Writes this object to the given output stream. - * - * @param out the output stream to write to - * @throws IOException if the stream raises it - */ - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - out.writeInt(table.length); - - // Have to use null-terminated list because size might shrink - // during iteration - - for (Iterator iter = entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Map.Entry)iter.next(); - out.writeObject(entry.getKey()); - out.writeObject(entry.getValue()); - } - out.writeObject(null); - } - - - /** - * Reads the contents of this object from the given input stream. - * - * @param inp the input stream to read from - * @throws IOException if the stream raises it - * @throws ClassNotFoundException if the stream raises it - */ - private void readObject(ObjectInputStream inp) throws IOException, ClassNotFoundException { - inp.defaultReadObject(); - table = new Entry[inp.readInt()]; - threshold = (int)(table.length * loadFactor); - queue = new ReferenceQueue(); - Object key = inp.readObject(); - while (key != null) { - Object value = inp.readObject(); - put(key, value); - key = inp.readObject(); - } - } - - - /** - * Constructs a reference of the given type to the given - * referent. The reference is registered with the queue - * for later purging. - * - * @param type HARD, SOFT or WEAK - * @param referent the object to refer to - * @param hash the hash code of the key of the mapping; - * this number might be different from referent.hashCode() if - * the referent represents a value and not a key - */ - private Object toReference(int type, Object referent, int hash) { - switch (type) { - case HARD: return referent; - case SOFT: return new SoftRef(hash, referent, queue); - case WEAK: return new WeakRef(hash, referent, queue); - default: throw new Error(); - } - } - - - /** - * Returns the entry associated with the given key. - * - * @param key the key of the entry to look up - * @return the entry associated with that key, or null - * if the key is not in this map - */ - private Entry getEntry(Object key) { - if (key == null) return null; - int hash = key.hashCode(); - int index = indexFor(hash); - for (Entry entry = table[index]; entry != null; entry = entry.next) { - if ((entry.hash == hash) && key.equals(entry.getKey())) { - return entry; - } - } - return null; - } - - - /** - * Converts the given hash code into an index into the - * hash table. - */ - private int indexFor(int hash) { - // mix the bits to avoid bucket collisions... - hash += ~(hash << 15); - hash ^= (hash >>> 10); - hash += (hash << 3); - hash ^= (hash >>> 6); - hash += ~(hash << 11); - hash ^= (hash >>> 16); - return hash & (table.length - 1); - } - - - - /** - * Resizes this hash table by doubling its capacity. - * This is an expensive operation, as entries must - * be copied from the old smaller table to the new - * bigger table. - */ - private void resize() { - Entry[] old = table; - table = new Entry[old.length * 2]; - - for (int i = 0; i < old.length; i++) { - Entry next = old[i]; - while (next != null) { - Entry entry = next; - next = next.next; - int index = indexFor(entry.hash); - entry.next = table[index]; - table[index] = entry; - } - old[i] = null; - } - threshold = (int)(table.length * loadFactor); - } - - - - /** - * Purges stale mappings from this map. - *

- * Ordinarily, stale mappings are only removed during - * a write operation, although this method is called for both - * read and write operations to maintain a consistent state. - *

- * Note that this method is not synchronized! Special - * care must be taken if, for instance, you want stale - * mappings to be removed on a periodic basis by some - * background thread. - */ - private void purge() { - Reference ref = queue.poll(); - while (ref != null) { - purge(ref); - ref = queue.poll(); - } - } - - - private void purge(Reference ref) { - // The hashCode of the reference is the hashCode of the - // mapping key, even if the reference refers to the - // mapping value... - int hash = ref.hashCode(); - int index = indexFor(hash); - Entry previous = null; - Entry entry = table[index]; - while (entry != null) { - if (entry.purge(ref)) { - if (previous == null) table[index] = entry.next; - else previous.next = entry.next; - this.size--; - return; - } - previous = entry; - entry = entry.next; - } - - } - - - /** - * Returns the size of this map. - * - * @return the size of this map - */ - public int size() { - purge(); - return size; - } - - - /** - * Returns true if this map is empty. - * - * @return true if this map is empty - */ - public boolean isEmpty() { - purge(); - return size == 0; - } - - - /** - * Returns true if this map contains the given key. - * - * @return true if the given key is in this map - */ - public boolean containsKey(Object key) { - purge(); - Entry entry = getEntry(key); - if (entry == null) return false; - return entry.getValue() != null; - } - - - /** - * Returns the value associated with the given key, if any. - * - * @return the value associated with the given key, or null - * if the key maps to no value - */ - public Object get(Object key) { - purge(); - Entry entry = getEntry(key); - if (entry == null) return null; - return entry.getValue(); - } - - - /** - * Associates the given key with the given value.

- * Neither the key nor the value may be null. - * - * @param key the key of the mapping - * @param value the value of the mapping - * @return the last value associated with that key, or - * null if no value was associated with the key - * @throws NullPointerException if either the key or value - * is null - */ - public Object put(Object key, Object value) { - if (key == null) throw new NullPointerException("null keys not allowed"); - if (value == null) throw new NullPointerException("null values not allowed"); - - purge(); - if (size + 1 > threshold) resize(); - - int hash = key.hashCode(); - int index = indexFor(hash); - Entry entry = table[index]; - while (entry != null) { - if ((hash == entry.hash) && key.equals(entry.getKey())) { - Object result = entry.getValue(); - entry.setValue(value); - return result; - } - entry = entry.next; - } - this.size++; - modCount++; - key = toReference(keyType, key, hash); - value = toReference(valueType, value, hash); - table[index] = new Entry(key, hash, value, table[index]); - return null; - } - - - /** - * Removes the key and its associated value from this map. - * - * @param key the key to remove - * @return the value associated with that key, or null if - * the key was not in the map - */ - public Object remove(Object key) { - if (key == null) return null; - purge(); - int hash = key.hashCode(); - int index = indexFor(hash); - Entry previous = null; - Entry entry = table[index]; - while (entry != null) { - if ((hash == entry.hash) && key.equals(entry.getKey())) { - if (previous == null) table[index] = entry.next; - else previous.next = entry.next; - this.size--; - modCount++; - return entry.getValue(); - } - previous = entry; - entry = entry.next; - } - return null; - } - - - /** - * Clears this map. - */ - public void clear() { - Arrays.fill(table, null); - size = 0; - while (queue.poll() != null); // drain the queue - } - - - /** - * Returns a set view of this map's entries. - * - * @return a set view of this map's entries - */ - public Set entrySet() { - if (entrySet != null) { - return entrySet; - } - entrySet = new AbstractSet() { - public int size() { - return ReferenceMap.this.size(); - } - - public void clear() { - ReferenceMap.this.clear(); - } - - public boolean contains(Object o) { - if (o == null) return false; - if (!(o instanceof Map.Entry)) return false; - Map.Entry e = (Map.Entry)o; - Entry e2 = getEntry(e.getKey()); - return (e2 != null) && e.equals(e2); - } - - public boolean remove(Object o) { - boolean r = contains(o); - if (r) { - Map.Entry e = (Map.Entry)o; - ReferenceMap.this.remove(e.getKey()); - } - return r; - } - - public Iterator iterator() { - return new EntryIterator(); - } - - public Object[] toArray() { - return toArray(new Object[0]); - } - - public Object[] toArray(Object[] arr) { - ArrayList list = new ArrayList(); - Iterator iterator = iterator(); - while (iterator.hasNext()) { - Entry e = (Entry)iterator.next(); - list.add(new DefaultMapEntry(e.getKey(), e.getValue())); - } - return list.toArray(arr); - } - }; - return entrySet; - } - - - /** - * Returns a set view of this map's keys. - * - * @return a set view of this map's keys - */ - public Set keySet() { - if (keySet != null) return keySet; - keySet = new AbstractSet() { - public int size() { - return ReferenceMap.this.size(); - } - - public Iterator iterator() { - return new KeyIterator(); - } - - public boolean contains(Object o) { - return containsKey(o); - } - - - public boolean remove(Object o) { - Object r = ReferenceMap.this.remove(o); - return r != null; - } - - public void clear() { - ReferenceMap.this.clear(); - } - - public Object[] toArray() { - return toArray(new Object[0]); - } - - public Object[] toArray(Object[] array) { - Collection c = new ArrayList(size()); - for (Iterator it = iterator(); it.hasNext(); ) { - c.add(it.next()); - } - return c.toArray(array); - } - }; - return keySet; - } - - - /** - * Returns a collection view of this map's values. - * - * @return a collection view of this map's values. - */ - public Collection values() { - if (values != null) return values; - values = new AbstractCollection() { - public int size() { - return ReferenceMap.this.size(); - } - - public void clear() { - ReferenceMap.this.clear(); - } - - public Iterator iterator() { - return new ValueIterator(); - } - - public Object[] toArray() { - return toArray(new Object[0]); - } - - public Object[] toArray(Object[] array) { - Collection c = new ArrayList(size()); - for (Iterator it = iterator(); it.hasNext(); ) { - c.add(it.next()); - } - return c.toArray(array); - } - }; - return values; - } - - - // If getKey() or getValue() returns null, it means - // the mapping is stale and should be removed. - private class Entry implements Map.Entry, KeyValue { - - Object key; - Object value; - int hash; - Entry next; - - - public Entry(Object key, int hash, Object value, Entry next) { - this.key = key; - this.hash = hash; - this.value = value; - this.next = next; - } - - - public Object getKey() { - return (keyType > HARD) ? ((Reference)key).get() : key; - } - - - public Object getValue() { - return (valueType > HARD) ? ((Reference)value).get() : value; - } - - - public Object setValue(Object object) { - Object old = getValue(); - if (valueType > HARD) ((Reference)value).clear(); - value = toReference(valueType, object, hash); - return old; - } - - - public boolean equals(Object o) { - if (o == null) return false; - if (o == this) return true; - if (!(o instanceof Map.Entry)) return false; - - Map.Entry entry = (Map.Entry)o; - Object key = entry.getKey(); - Object value = entry.getValue(); - if ((key == null) || (value == null)) return false; - return key.equals(getKey()) && value.equals(getValue()); - } - - - public int hashCode() { - Object v = getValue(); - return hash ^ ((v == null) ? 0 : v.hashCode()); - } - - - public String toString() { - return getKey() + "=" + getValue(); - } - - - boolean purge(Reference ref) { - boolean r = (keyType > HARD) && (key == ref); - r = r || ((valueType > HARD) && (value == ref)); - if (r) { - if (keyType > HARD) ((Reference)key).clear(); - if (valueType > HARD) { - ((Reference)value).clear(); - } else if (purgeValues) { - value = null; - } - } - return r; - } - } - - - private class EntryIterator implements Iterator { - // These fields keep track of where we are in the table. - int index; - Entry entry; - Entry previous; - - // These Object fields provide hard references to the - // current and next entry; this assures that if hasNext() - // returns true, next() will actually return a valid element. - Object nextKey, nextValue; - Object currentKey, currentValue; - - int expectedModCount; - - - public EntryIterator() { - index = (size() != 0 ? table.length : 0); - // have to do this here! size() invocation above - // may have altered the modCount. - expectedModCount = modCount; - } - - - public boolean hasNext() { - checkMod(); - while (nextNull()) { - Entry e = entry; - int i = index; - while ((e == null) && (i > 0)) { - i--; - e = table[i]; - } - entry = e; - index = i; - if (e == null) { - currentKey = null; - currentValue = null; - return false; - } - nextKey = e.getKey(); - nextValue = e.getValue(); - if (nextNull()) entry = entry.next; - } - return true; - } - - - private void checkMod() { - if (modCount != expectedModCount) { - throw new ConcurrentModificationException(); - } - } - - - private boolean nextNull() { - return (nextKey == null) || (nextValue == null); - } - - protected Entry nextEntry() { - checkMod(); - if (nextNull() && !hasNext()) throw new NoSuchElementException(); - previous = entry; - entry = entry.next; - currentKey = nextKey; - currentValue = nextValue; - nextKey = null; - nextValue = null; - return previous; - } - - - public Object next() { - return nextEntry(); - } - - - public void remove() { - checkMod(); - if (previous == null) throw new IllegalStateException(); - ReferenceMap.this.remove(currentKey); - previous = null; - currentKey = null; - currentValue = null; - expectedModCount = modCount; - } - - } - - - private class ValueIterator extends EntryIterator { - public Object next() { - return nextEntry().getValue(); - } - } - - - private class KeyIterator extends EntryIterator { - public Object next() { - return nextEntry().getKey(); - } - } - - - - // These two classes store the hashCode of the key of - // of the mapping, so that after they're dequeued a quick - // lookup of the bucket in the table can occur. - - - private static class SoftRef extends SoftReference { - private int hash; - - - public SoftRef(int hash, Object r, ReferenceQueue q) { - super(r, q); - this.hash = hash; - } - - - public int hashCode() { - return hash; - } - } - - - private static class WeakRef extends WeakReference { - private int hash; - - - public WeakRef(int hash, Object r, ReferenceQueue q) { - super(r, q); - this.hash = hash; - } - - - public int hashCode() { - return hash; - } - } - - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java deleted file mode 100644 index a7ca67ad15..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2003-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.collections.keyvalue; - -import org.apache.qpid.collections.KeyValue; - - -/** - * Abstract pair class to assist with creating KeyValue - * and {@link java.util.Map.Entry Map.Entry} implementations. - * - * @since Commons Collections 3.0 - * @version $Revision$ $Date$ - * - * @author James Strachan - * @author Michael A. Smith - * @author Neil O'Toole - * @author Stephen Colebourne - */ -public abstract class AbstractKeyValue implements KeyValue { - - /** The key */ - protected Object key; - /** The value */ - protected Object value; - - /** - * Constructs a new pair with the specified key and given value. - * - * @param key the key for the entry, may be null - * @param value the value for the entry, may be null - */ - protected AbstractKeyValue(Object key, Object value) { - super(); - this.key = key; - this.value = value; - } - - /** - * Gets the key from the pair. - * - * @return the key - */ - public Object getKey() { - return key; - } - - /** - * Gets the value from the pair. - * - * @return the value - */ - public Object getValue() { - return value; - } - - /** - * Gets a debugging String view of the pair. - * - * @return a String view of the entry - */ - public String toString() { - return new StringBuffer() - .append(getKey()) - .append('=') - .append(getValue()) - .toString(); - } - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java deleted file mode 100644 index f4717a1c20..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2003-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.collections.keyvalue; - -import java.util.Map; - -import org.apache.qpid.collections.keyvalue.AbstractKeyValue; - -/** - * Abstract Pair class to assist with creating correct - * {@link java.util.Map.Entry Map.Entry} implementations. - * - * @since Commons Collections 3.0 - * @version $Revision$ $Date$ - * - * @author James Strachan - * @author Michael A. Smith - * @author Neil O'Toole - * @author Stephen Colebourne - */ -public abstract class AbstractMapEntry extends AbstractKeyValue implements Map.Entry { - - /** - * Constructs a new entry with the given key and given value. - * - * @param key the key for the entry, may be null - * @param value the value for the entry, may be null - */ - protected AbstractMapEntry(Object key, Object value) { - super(key, value); - } - - // Map.Entry interface - //------------------------------------------------------------------------- - /** - * Sets the value stored in this Map.Entry. - *

- * This Map.Entry is not connected to a Map, so only the - * local data is changed. - * - * @param value the new value - * @return the previous value - */ - public Object setValue(Object value) { - Object answer = this.value; - this.value = value; - return answer; - } - - /** - * Compares this Map.Entry with another Map.Entry. - *

- * Implemented per API documentation of {@link java.util.Map.Entry#equals(Object)} - * - * @param obj the object to compare to - * @return true if equal key and value - */ - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof Map.Entry == false) { - return false; - } - Map.Entry other = (Map.Entry) obj; - return - (getKey() == null ? other.getKey() == null : getKey().equals(other.getKey())) && - (getValue() == null ? other.getValue() == null : getValue().equals(other.getValue())); - } - - /** - * Gets a hashCode compatible with the equals method. - *

- * Implemented per API documentation of {@link java.util.Map.Entry#hashCode()} - * - * @return a suitable hash code - */ - public int hashCode() { - return (getKey() == null ? 0 : getKey().hashCode()) ^ - (getValue() == null ? 0 : getValue().hashCode()); - } - -} \ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/DefaultMapEntry.java b/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/DefaultMapEntry.java deleted file mode 100644 index f0f04a366a..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/collections/keyvalue/DefaultMapEntry.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2001-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.collections.keyvalue; - -import java.util.Map; - -import org.apache.qpid.collections.KeyValue; -import org.apache.qpid.collections.keyvalue.AbstractMapEntry; - -/** - * A restricted implementation of {@link java.util.Map.Entry} that prevents - * the Map.Entry contract from being broken. - * - * @since Commons Collections 3.0 - * @version $Revision$ $Date$ - * - * @author James Strachan - * @author Michael A. Smith - * @author Neil O'Toole - * @author Stephen Colebourne - */ -public final class DefaultMapEntry extends AbstractMapEntry { - - /** - * Constructs a new entry with the specified key and given value. - * - * @param key the key for the entry, may be null - * @param value the value for the entry, may be null - */ - public DefaultMapEntry(final Object key, final Object value) { - super(key, value); - } - - /** - * Constructs a new entry from the specified KeyValue. - * - * @param pair the pair to copy, must not be null - * @throws NullPointerException if the entry is null - */ - public DefaultMapEntry(final KeyValue pair) { - super(pair.getKey(), pair.getValue()); - } - - /** - * Constructs a new entry from the specified Map.Entry. - * - * @param entry the entry to copy, must not be null - * @throws NullPointerException if the entry is null - */ - public DefaultMapEntry(final Map.Entry entry) { - super(entry.getKey(), entry.getValue()); - } - -} \ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 0c2f4ce57d..71d7ffd2a3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.jms; -import java.util.Map; - import org.apache.qpid.transport.ConnectionSettings; +import java.util.Map; + public interface BrokerDetails { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 24d9360cfa..8fd6ff6d33 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -21,7 +21,6 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ProtocolVersion; import java.util.List; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 56abf03c81..f4d2ecc36d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.jms; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.jms.failover.FailoverExchangeMethod; import org.apache.qpid.jms.failover.FailoverMethod; import org.apache.qpid.jms.failover.FailoverRoundRobinServers; import org.apache.qpid.jms.failover.FailoverSingleServer; import org.apache.qpid.jms.failover.NoFailover; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class FailoverPolicy { private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class); @@ -74,12 +74,6 @@ public class FailoverPolicy { String failoverMethod = connectionDetails.getFailoverMethod(); - /* - if (failoverMethod.equals(FailoverMethod.RANDOM)) - { - //todo write a random connection Failover - } - */ if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER)) { method = new FailoverSingleServer(connectionDetails); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java index 4ad917fa83..bec8b0917d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java @@ -20,11 +20,10 @@ */ package org.apache.qpid.jms; -import java.io.UnsupportedEncodingException; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import java.io.UnsupportedEncodingException; /** */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java index 5287381fae..b4bf2d1d85 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java @@ -26,9 +26,11 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.QueueSession; +import javax.jms.TopicSession; -public interface Session extends javax.jms.Session +public interface Session extends TopicSession, QueueSession { /** * Indicates that no client acknowledgements are required. Broker assumes that once it has delivered diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index cb3ab718e9..a5eda29274 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -20,15 +20,8 @@ */ package org.apache.qpid.jms.failover; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQBrokerDetails; @@ -36,8 +29,15 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionURL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; /** * When using the Failover exchange a single broker is supplied in the URL. @@ -127,9 +127,7 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener for (String brokerEntry:list) { String[] urls = brokerEntry.substring(5) .split(","); - // Iterate until you find the correct transport - // Need to reconsider the logic when the C++ broker supports - // SSL URLs. + for (String url:urls) { String[] tokens = url.split(":"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java index 41ba4974ec..84c1794723 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.jms.failover; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; + public class FailoverRoundRobinServers implements FailoverMethod { private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class); @@ -49,8 +50,7 @@ public class FailoverRoundRobinServers implements FailoverMethod /** The current number of cycles performed. */ private int _currentCycleRetries = 0; - /** Array of BrokerDetail used to make connections. */ - protected ConnectionURL _connectionDetails; + private ConnectionURL _connectionDetails; public FailoverRoundRobinServers(ConnectionURL connectionDetails) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java index d033a49f5c..1ef71eccba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.jms.failover; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; + public class FailoverSingleServer implements FailoverMethod { private static final Logger _logger = LoggerFactory.getLogger(FailoverSingleServer.class); @@ -36,10 +37,10 @@ public class FailoverSingleServer implements FailoverMethod private BrokerDetails _brokerDetail; /** The number of times to retry connecting to the sever */ - protected int _retries; + private int _retries; /** The current number of attempts made to the server */ - protected int _currentRetries = 0; + private int _currentRetries = 0; public FailoverSingleServer(ConnectionURL connectionDetails) @@ -64,6 +65,11 @@ public class FailoverSingleServer implements FailoverMethod _currentRetries = 0; } + protected void setCurrentRetries(int currentRetries) + { + _currentRetries = currentRetries; + } + public boolean failoverAllowed() { return _currentRetries < _retries; @@ -150,6 +156,11 @@ public class FailoverSingleServer implements FailoverMethod _retries = retries; } + public int getRetries() + { + return _retries; + } + public String methodName() { return "Single Server"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java index 1231324397..82cb0f9153 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java @@ -44,7 +44,7 @@ public class NoFailover extends FailoverSingleServer public void attainedConnection() { _connected=true; - _currentRetries = _retries; + setCurrentRetries(getRetries()); } @Override diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index b480f56c07..bc3f89849e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,24 +20,8 @@ */ package org.apache.qpid.jndi; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.ConfigurationException; -import javax.naming.spi.InitialContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; @@ -46,16 +30,30 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.ConfigurationException; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; public class PropertiesFileInitialContextFactory implements InitialContextFactory { - protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); + private final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); private String CONNECTION_FACTORY_PREFIX = "connectionfactory."; private String DESTINATION_PREFIX = "destination."; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/ReadOnlyContext.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/ReadOnlyContext.java index 1719ea1219..76ec5f9498 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/ReadOnlyContext.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/ReadOnlyContext.java @@ -20,13 +20,6 @@ */ package org.apache.qpid.jndi; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; - import javax.naming.Binding; import javax.naming.CompositeName; import javax.naming.Context; @@ -41,6 +34,12 @@ import javax.naming.NotContextException; import javax.naming.OperationNotSupportedException; import javax.naming.Reference; import javax.naming.spi.NamingManager; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; /** * Based on class from ActiveMQ. @@ -66,9 +65,9 @@ public class ReadOnlyContext implements Context, Serializable private static final long serialVersionUID = -5754338187296859149L; protected static final NameParser nameParser = new NameParserImpl(); - protected final Hashtable environment; // environment for this context - protected final Map bindings; // bindings at my level - protected final Map treeBindings; // all bindings under me + private final Hashtable environment; // environment for this context + private final Map bindings; // bindings at my level + private final Map treeBindings; // all bindings under me private boolean frozen = false; private String nameInNamespace = ""; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java b/qpid/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java deleted file mode 100644 index 59ec4cfba7..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/naming/ReadOnlyContext.java +++ /dev/null @@ -1,509 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.naming; - -import org.apache.qpid.jndi.NameParserImpl; - -import javax.naming.*; -import javax.naming.spi.NamingManager; -import java.io.Serializable; -import java.util.*; - -/** - * Based on class from ActiveMQ. - * A read-only Context - *

- * This version assumes it and all its subcontext are read-only and any attempt - * to modify (e.g. through bind) will result in an OperationNotSupportedException. - * Each Context in the tree builds a cache of the entries in all sub-contexts - * to optimise the performance of lookup. - *

- *

This implementation is intended to optimise the performance of lookup(String) - * to about the level of a HashMap get. It has been observed that the scheme - * resolution phase performed by the JVM takes considerably longer, so for - * optimum performance lookups should be coded like:

- * - * Context componentContext = (Context)new InitialContext().lookup("java:comp"); - * String envEntry = (String) componentContext.lookup("env/myEntry"); - * String envEntry2 = (String) componentContext.lookup("env/myEntry2"); - * - */ -public class ReadOnlyContext implements Context, Serializable -{ - private static final long serialVersionUID = -5754338187296859149L; - protected static final NameParser nameParser = new NameParserImpl(); - - protected final Hashtable environment; // environment for this context - protected final Map bindings; // bindings at my level - protected final Map treeBindings; // all bindings under me - - private boolean frozen = false; - private String nameInNamespace = ""; - public static final String SEPARATOR = "/"; - - public ReadOnlyContext() - { - environment = new Hashtable(); - bindings = new HashMap(); - treeBindings = new HashMap(); - } - - public ReadOnlyContext(Hashtable env) - { - if (env == null) - { - this.environment = new Hashtable(); - } - else - { - this.environment = new Hashtable(env); - } - - this.bindings = Collections.EMPTY_MAP; - this.treeBindings = Collections.EMPTY_MAP; - } - - public ReadOnlyContext(Hashtable environment, Map bindings) - { - if (environment == null) - { - this.environment = new Hashtable(); - } - else - { - this.environment = new Hashtable(environment); - } - - this.bindings = bindings; - treeBindings = new HashMap(); - frozen = true; - } - - public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace) - { - this(environment, bindings); - this.nameInNamespace = nameInNamespace; - } - - protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env) - { - this.bindings = clone.bindings; - this.treeBindings = clone.treeBindings; - this.environment = new Hashtable(env); - } - - protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace) - { - this(clone, env); - this.nameInNamespace = nameInNamespace; - } - - public void freeze() - { - frozen = true; - } - - boolean isFrozen() - { - return frozen; - } - - /** - * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses. - * It binds every possible lookup into a map in each context. To do this, each context - * strips off one name segment and if necessary creates a new context for it. Then it asks that context - * to bind the remaining name. It returns a map containing all the bindings from the next context, plus - * the context it just created (if it in fact created it). (the names are suitably extended by the segment - * originally lopped off). - * - * @param name - * @param value - * @return - * @throws javax.naming.NamingException - */ - protected Map internalBind(String name, Object value) throws NamingException - { - assert (name != null) && (name.length() > 0); - assert !frozen; - - Map newBindings = new HashMap(); - int pos = name.indexOf('/'); - if (pos == -1) - { - if (treeBindings.put(name, value) != null) - { - throw new NamingException("Something already bound at " + name); - } - - bindings.put(name, value); - newBindings.put(name, value); - } - else - { - String segment = name.substring(0, pos); - assert segment != null; - assert !segment.equals(""); - Object o = treeBindings.get(segment); - if (o == null) - { - o = newContext(); - treeBindings.put(segment, o); - bindings.put(segment, o); - newBindings.put(segment, o); - } - else if (!(o instanceof ReadOnlyContext)) - { - throw new NamingException("Something already bound where a subcontext should go"); - } - - ReadOnlyContext readOnlyContext = (ReadOnlyContext) o; - String remainder = name.substring(pos + 1); - Map subBindings = readOnlyContext.internalBind(remainder, value); - for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();) - { - Map.Entry entry = (Map.Entry) iterator.next(); - String subName = segment + "/" + (String) entry.getKey(); - Object bound = entry.getValue(); - treeBindings.put(subName, bound); - newBindings.put(subName, bound); - } - } - - return newBindings; - } - - protected ReadOnlyContext newContext() - { - return new ReadOnlyContext(); - } - - public Object addToEnvironment(String propName, Object propVal) throws NamingException - { - return environment.put(propName, propVal); - } - - public Hashtable getEnvironment() throws NamingException - { - return (Hashtable) environment.clone(); - } - - public Object removeFromEnvironment(String propName) throws NamingException - { - return environment.remove(propName); - } - - public Object lookup(String name) throws NamingException - { - if (name.length() == 0) - { - return this; - } - - Object result = treeBindings.get(name); - if (result == null) - { - result = bindings.get(name); - } - - if (result == null) - { - int pos = name.indexOf(':'); - if (pos > 0) - { - String scheme = name.substring(0, pos); - Context ctx = NamingManager.getURLContext(scheme, environment); - if (ctx == null) - { - throw new NamingException("scheme " + scheme + " not recognized"); - } - - return ctx.lookup(name); - } - else - { - // Split out the first name of the path - // and look for it in the bindings map. - CompositeName path = new CompositeName(name); - - if (path.size() == 0) - { - return this; - } - else - { - String first = path.get(0); - Object obj = bindings.get(first); - if (obj == null) - { - throw new NameNotFoundException(name); - } - else if ((obj instanceof Context) && (path.size() > 1)) - { - Context subContext = (Context) obj; - obj = subContext.lookup(path.getSuffix(1)); - } - - return obj; - } - } - } - - if (result instanceof LinkRef) - { - LinkRef ref = (LinkRef) result; - result = lookup(ref.getLinkName()); - } - - if (result instanceof Reference) - { - try - { - result = NamingManager.getObjectInstance(result, null, null, this.environment); - } - catch (NamingException e) - { - throw e; - } - catch (Exception e) - { - throw (NamingException) new NamingException("could not look up : " + name).initCause(e); - } - } - - if (result instanceof ReadOnlyContext) - { - String prefix = getNameInNamespace(); - if (prefix.length() > 0) - { - prefix = prefix + SEPARATOR; - } - - result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name); - } - - return result; - } - - public Object lookup(Name name) throws NamingException - { - return lookup(name.toString()); - } - - public Object lookupLink(String name) throws NamingException - { - return lookup(name); - } - - public Name composeName(Name name, Name prefix) throws NamingException - { - Name result = (Name) prefix.clone(); - result.addAll(name); - - return result; - } - - public String composeName(String name, String prefix) throws NamingException - { - CompositeName result = new CompositeName(prefix); - result.addAll(new CompositeName(name)); - - return result.toString(); - } - - public NamingEnumeration list(String name) throws NamingException - { - Object o = lookup(name); - if (o == this) - { - return new ReadOnlyContext.ListEnumeration(); - } - else if (o instanceof Context) - { - return ((Context) o).list(""); - } - else - { - throw new NotContextException(); - } - } - - public NamingEnumeration listBindings(String name) throws NamingException - { - Object o = lookup(name); - if (o == this) - { - return new ReadOnlyContext.ListBindingEnumeration(); - } - else if (o instanceof Context) - { - return ((Context) o).listBindings(""); - } - else - { - throw new NotContextException(); - } - } - - public Object lookupLink(Name name) throws NamingException - { - return lookupLink(name.toString()); - } - - public NamingEnumeration list(Name name) throws NamingException - { - return list(name.toString()); - } - - public NamingEnumeration listBindings(Name name) throws NamingException - { - return listBindings(name.toString()); - } - - public void bind(Name name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void bind(String name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void close() throws NamingException - { - // ignore - } - - public Context createSubcontext(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public Context createSubcontext(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void destroySubcontext(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void destroySubcontext(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public String getNameInNamespace() throws NamingException - { - return nameInNamespace; - } - - public NameParser getNameParser(Name name) throws NamingException - { - return nameParser; - } - - public NameParser getNameParser(String name) throws NamingException - { - return nameParser; - } - - public void rebind(Name name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rebind(String name, Object obj) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rename(Name oldName, Name newName) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void rename(String oldName, String newName) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void unbind(Name name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - public void unbind(String name) throws NamingException - { - throw new OperationNotSupportedException(); - } - - private abstract class LocalNamingEnumeration implements NamingEnumeration - { - private Iterator i = bindings.entrySet().iterator(); - - public boolean hasMore() throws NamingException - { - return i.hasNext(); - } - - public boolean hasMoreElements() - { - return i.hasNext(); - } - - protected Map.Entry getNext() - { - return (Map.Entry) i.next(); - } - - public void close() throws NamingException - { } - } - - private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration - { - public Object next() throws NamingException - { - return nextElement(); - } - - public Object nextElement() - { - Map.Entry entry = getNext(); - - return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName()); - } - } - - private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration - { - public Object next() throws NamingException - { - return nextElement(); - } - - public Object nextElement() - { - Map.Entry entry = getNext(); - - return new Binding((String) entry.getKey(), entry.getValue()); - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/naming/jndi.properties b/qpid/java/client/src/main/java/org/apache/qpid/naming/jndi.properties deleted file mode 100644 index 830de5f619..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/naming/jndi.properties +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -java.naming.factory.initial = org.apache.qpid.naming.PropertiesFileInitialConextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -# qpid:username=foo;password=password;client_id=id;virtualhost=path@tpc:localhost:1556 -connectionfactory.local = qpid:tcp:localhost' - -# register some queues in JNDI using the form -# queue.[jndiName] = [physicalName] -queue.MyQueue = example.MyQueue - -# register some topics in JNDI using the form -# topic.[jndiName] = [physicalName] -topic.ibmStocks = stocks.nyse.ibm - -# Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers -# destination.[jniName] = [BindingURL] -destination.direct = direct://amq.direct//directQueue diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java index 6f07dcb469..b90f4308cd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java @@ -17,9 +17,6 @@ */ package org.apache.qpid.nclient; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageTransfer; /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java index 14bfb4f95e..7134f0a960 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java @@ -21,14 +21,17 @@ package org.apache.qpid.nclient.util; */ -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - +import org.apache.qpid.api.Message; import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Header; -import org.apache.qpid.api.Message; +import org.apache.qpid.transport.MessageProperties; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; /** *

A Simple implementation of the message interface @@ -42,7 +45,7 @@ import org.apache.qpid.api.Message; */ public class ByteBufferMessage implements Message { - private List _data;// = new ArrayList(); + private List _data; private ByteBuffer _readBuffer; private int _dataSize; private DeliveryProperties _currentDeliveryProps; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index 2f1eda6ef2..9a2e9de3d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -21,12 +21,13 @@ package org.apache.qpid.nclient.util; */ +import org.apache.qpid.nclient.MessagePartListener; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageTransfer; + import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpid.transport.*; -import org.apache.qpid.nclient.MessagePartListener; - /** * This is a simple message assembler. * Will call onMessage method of the adaptee @@ -37,8 +38,8 @@ import org.apache.qpid.nclient.MessagePartListener; */ public class MessagePartListenerAdapter implements MessagePartListener { - MessageListener _adaptee; - ByteBufferMessage _currentMsg; + private MessageListener _adaptee; + private ByteBufferMessage _currentMsg; public MessagePartListenerAdapter(MessageListener listener) { -- cgit v1.2.1