diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache')
149 files changed, 3234 insertions, 3392 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java b/qpid/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java deleted file mode 100644 index 73ee747c07..0000000000 --- a/qpid/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java +++ /dev/null @@ -1,129 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.configuration; - -import java.util.HashMap; -import java.util.Map; - -public class PropertyNameResolver -{ - public static interface Accessor - { - Object get(String name); - } - - private static Map<Class<?>,Accessor> accessors = new HashMap<Class<?>,Accessor>(); - protected Map<String,QpidProperty> properties; - - private static class BooleanAccessor implements Accessor - { - public Boolean get(String name) - { - return Boolean.getBoolean(name); - } - } - - private static class IntegerAccessor implements Accessor - { - public Integer get(String name) - { - return Integer.getInteger(name); - } - } - - private static class LongAccessor implements Accessor - { - public Long get(String name) - { - return Long.getLong(name); - } - } - - private static class StringAccessor implements Accessor - { - public String get(String name) - { - return System.getProperty(name); - } - } - - static - { - accessors.put(Boolean.class, new BooleanAccessor()); - accessors.put(Integer.class, new IntegerAccessor()); - accessors.put(String.class, new StringAccessor()); - accessors.put(Long.class, new LongAccessor()); - } - - public Integer getIntegerValue(String propName) - { - return properties.get(propName).get(Integer.class); - } - - public Long getLongValue(String propName) - { - return properties.get(propName).get(Long.class); - } - - public String getStringValue(String propName) - { - return properties.get(propName).get(String.class); - } - - public Boolean getBooleanValue(String propName) - { - return properties.get(propName).get(Boolean.class); - } - - public <T> T get(String propName,Class<T> klass) - { - return properties.get(propName).get(klass); - } - - static class QpidProperty - { - private Object defValue; - private String[] names; - - QpidProperty(Object defValue, String ... names) - { - this.defValue = defValue; - this.names = names; - } - - <T> T get(Class<T> klass) - { - Accessor acc = accessors.get(klass); - for (String name : names) - { - Object obj = acc.get(name); - if (obj != null) - { - return klass.cast(obj); - } - } - - return klass.cast(defValue); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 2f6290b55a..44cd603a8d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -20,7 +20,9 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index ca9c9f9dc4..c7a0816f91 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,7 +21,9 @@ package org.apache.qpid; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; /** @@ -45,7 +47,7 @@ public class AMQConnectionException extends AMQException /** AMQP version for which exception ocurred, minor code. */ private final byte minor; - boolean _closeConnetion; + private boolean _closeConnetion; public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable cause) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index f2503e549f..d9a9ee0782 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -20,10 +20,10 @@ */ package org.apache.qpid; -import java.util.Collection; - import org.apache.qpid.protocol.AMQConstant; +import java.util.Collection; + /** * AMQConnectionFailureException indicates that a connection to a broker could not be formed. * @@ -36,7 +36,7 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQConnectionFailureException extends AMQException { - Collection<Exception> _exceptions; + private Collection<Exception> _exceptions; public AMQConnectionFailureException(String message, Throwable cause) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQStoreException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQStoreException.java index 8389fe5efa..45aa36a20b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQStoreException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQStoreException.java @@ -20,10 +20,8 @@ */ package org.apache.qpid; -import java.sql.SQLException; - /** - * StoreException is a specific type of internal error relating to errors in the message store, such as {@link SQLException}. + * StoreException is a specific type of internal error relating to errors in the message store, such as {@link java.sql.SQLException}. */ public class AMQStoreException extends AMQInternalException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java index eee3e6afcf..82ffe583c3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java @@ -35,7 +35,7 @@ package org.apache.qpid; */ public class AMQUnresolvedAddressException extends AMQException { - String _broker; + private String _broker; public AMQUnresolvedAddressException(String message, String broker, Throwable cause) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java deleted file mode 100644 index 00ad5cf08a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import static org.apache.qpid.transport.util.Functions.str; - -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Sender; - - -/** - * ConsoleOutput - * - * @author Rafael H. Schloming - */ - -public class ConsoleOutput implements Sender<ByteBuffer> -{ - - public void send(ByteBuffer buf) - { - System.out.println(str(buf)); - } - - public void flush() - { - // pass - } - - public void close() - { - System.out.println("CLOSED"); - } - - public void setIdleTimeout(int i) - { - // TODO Auto-generated method stub - - } - - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/QpidConfig.java b/qpid/java/common/src/main/java/org/apache/qpid/QpidConfig.java deleted file mode 100644 index b4cad44130..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/QpidConfig.java +++ /dev/null @@ -1,111 +0,0 @@ -package org.apache.qpid; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * API to configure the Security parameters of the client. - * The user can choose to pick the config from any source - * and set it using this class. - * - */ -public class QpidConfig -{ - private static QpidConfig _instance = new QpidConfig(); - - private SecurityMechanism[] securityMechanisms = - new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpid.security.UsernamePasswordCallbackHandler"), - new SecurityMechanism("CRAM_MD5","org.apache.qpid.security.UsernamePasswordCallbackHandler")}; - - private SaslClientFactory[] saslClientFactories = - new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpid.security.amqplain.AmqPlainSaslClientFactory")}; - - private QpidConfig(){} - - public static QpidConfig get() - { - return _instance; - } - - public void setSecurityMechanisms(SecurityMechanism... securityMechanisms) - { - this.securityMechanisms = securityMechanisms; - } - - public SecurityMechanism[] getSecurityMechanisms() - { - return securityMechanisms; - } - - public void setSaslClientFactories(SaslClientFactory... saslClientFactories) - { - this.saslClientFactories = saslClientFactories; - } - - public SaslClientFactory[] getSaslClientFactories() - { - return saslClientFactories; - } - - public static class SecurityMechanism - { - String type; - String handler; - - SecurityMechanism(String type,String handler) - { - this.type = type; - this.handler = handler; - } - - public String getHandler() - { - return handler; - } - - public String getType() - { - return type; - } - } - - public static class SaslClientFactory - { - String type; - String factoryClass; - - SaslClientFactory(String type,String factoryClass) - { - this.type = type; - this.factoryClass = factoryClass; - } - - public String getFactoryClass() - { - return factoryClass; - } - - public String getType() - { - return type; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/SerialException.java b/qpid/java/common/src/main/java/org/apache/qpid/SerialException.java deleted file mode 100644 index c59a6af779..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/SerialException.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.apache.qpid; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * This exception is used by the serial class (imp RFC 1982) - * - */ -public class SerialException extends ArithmeticException -{ - /** - * Constructs an <code>SerialException</code> with the specified - * detail message. - * - * @param message The exception message. - */ - public SerialException(String message) - { - super(message); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/api/Message.java b/qpid/java/common/src/main/java/org/apache/qpid/api/Message.java index df6f279026..49c7be162c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/api/Message.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/api/Message.java @@ -1,11 +1,11 @@ package org.apache.qpid.api; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; + +import java.io.IOException; +import java.nio.ByteBuffer; /* * Licensed to the Apache Software Foundation (ASF) under one diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 1d196534b2..ffdb7e6573 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,13 +20,27 @@ */ package org.apache.qpid.codec; -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ByteArrayDataInput; +import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 9ed915cc35..57cd2a1ff5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -34,11 +34,13 @@ public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), - AUTO_CLOSE("x-filter-auto-close"); + AUTO_CLOSE("x-filter-auto-close"), + NO_LOCAL("x-qpid-no-local"); /** The identifying string for the filter type. */ private final AMQShortString _value; + /** * Creates a new filter type from its identifying string. * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java deleted file mode 100644 index 7371c12519..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.common; - -import org.apache.qpid.framing.AMQShortString; - -/** - * Specifies the available client property types that different clients can use to identify themselves with. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Specify the available client property types. - * </table> - */ -public enum ClientProperties -{ - instance("instance"), - product("product"), - version("version"), - platform("platform"); - - private final AMQShortString _amqShortString; - - private ClientProperties(String name) - { - _amqShortString = new AMQShortString(name); - } - - - public AMQShortString toAMQShortString() - { - return _amqShortString; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java index aa262bdde5..dd94f8251b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java @@ -26,6 +26,10 @@ package org.apache.qpid.common; */ public final class ServerPropertyNames { + private ServerPropertyNames() + { + } + /** * Server property: federation tag UUID */ diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index dc5b69dc89..517fd1829f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -1,4 +1,3 @@ -package org.apache.qpid.configuration; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,12 +18,11 @@ package org.apache.qpid.configuration; * under the License. * */ - +package org.apache.qpid.configuration; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -41,7 +39,7 @@ public interface Accessor { public Boolean getBoolean(String name) { - return Boolean.getBoolean(name); + return System.getProperty(name) == null ? null : Boolean.getBoolean(name); } public Integer getInt(String name) @@ -62,13 +60,18 @@ public interface Accessor static class MapAccessor implements Accessor { - protected Map<Object,Object> source; + private Map<Object,Object> source; public MapAccessor(Map<Object,Object> map) { source = map; } - + + protected void setSource(Map<Object, Object> source) + { + this.source = source; + } + public Boolean getBoolean(String name) { if (source != null && source.containsKey(name)) @@ -161,8 +164,10 @@ public interface Accessor { inStream.close(); } - source = props; + setSource(props); } + + } static class CombinedAccessor implements Accessor diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index a36e7c214e..3227bb6fc2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -95,6 +95,7 @@ public class ClientProperties * synchronous operations. */ public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout"; + @Deprecated public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout"; /** @@ -106,6 +107,7 @@ public class ClientProperties * System properties to change the default value used for TCP_NODELAY */ public static final String QPID_TCP_NODELAY_PROP_NAME = "qpid.tcp_nodelay"; + @Deprecated public static final String AMQJ_TCP_NODELAY_PROP_NAME = "amqj.tcp_nodelay"; /** @@ -116,16 +118,54 @@ public class ClientProperties */ public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour"; - /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); + private ClientProperties() + { + } - public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); + /** + * System property used to set the key manager factory algorithm. + * + * Historically, Qpid referred to this as {@value #QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME}. + */ + public static final String QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME = "qpid.ssl.KeyManagerFactory.algorithm"; + @Deprecated + public static final String QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME = "qpid.ssl.keyStoreCertType"; + /** + * System property used to set the trust manager factory algorithm. + * + * Historically, Qpid referred to this as {@value #QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME}. + */ + public static final String QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME = "qpid.ssl.TrustManagerFactory.algorithm"; + @Deprecated + public static final String QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME = "qpid.ssl.trustStoreCertType"; - public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = - QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ + /** + * System property to enable allow dispatcher thread to be run as a daemon thread + */ + public static final String DAEMON_DISPATCHER = "qpid.jms.daemon.dispatcher"; + /** + * Used to name the process utilising the Qpid client, to override the default + * value is used in the ConnectionStartOk reply to the broker. + */ + public static final String PROCESS_NAME = "qpid.client_process"; + + /** + * System property used to set the socket receive buffer size. + * + * Historically, Qpid referred to this as {@value #LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME}. + */ + public static final String RECEIVE_BUFFER_SIZE_PROP_NAME = "qpid.receive_buffer_size"; + @Deprecated + public static final String LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME = "amqj.receiveBufferSize"; + /** + * System property used to set the socket send buffer size. + * + * Historically, Qpid referred to this as {@value #LEGACY_SEND_BUFFER_SIZE_PROP_NAME}. + */ + public static final String SEND_BUFFER_SIZE_PROP_NAME = "qpid.send_buffer_size"; + @Deprecated + public static final String LEGACY_SEND_BUFFER_SIZE_PROP_NAME = "amqj.sendBufferSize"; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 73a336321c..b8181e3b87 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -21,7 +21,6 @@ package org.apache.qpid.configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; /** * Indicates a failure to parse a property expansion. See {@link PropertyUtils} for the code that does property diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java index 6e2b25fb2c..81702ee1ea 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -40,6 +40,10 @@ import java.util.Iterator; */ public class PropertyUtils { + private PropertyUtils() + { + } + /** * Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a * system properties and substitutes tham back into the original string, to provide a property value expanded @@ -66,13 +70,13 @@ public class PropertyUtils parsePropertyString(value, fragments, propertyRefs); StringBuffer sb = new StringBuffer(); - Iterator j = propertyRefs.iterator(); + Iterator<String> j = propertyRefs.iterator(); for (String fragment : fragments) { if (fragment == null) { - String propertyName = (String) j.next(); + String propertyName = j.next(); // try to get it from the project or keys // Backward compatibility diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java index 9c0aaaec89..e0989495bb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java @@ -20,11 +20,11 @@ package org.apache.qpid.configuration; import org.apache.qpid.configuration.Accessor.SystemPropertyAccessor; -abstract class QpidProperty<T> +public abstract class QpidProperty<T> { private T defValue; private String[] names; - protected Accessor accessor; + private Accessor accessor; QpidProperty(T defValue, String... names) { @@ -38,7 +38,7 @@ abstract class QpidProperty<T> this.names = names; } - T get() + public T get() { for (String name : names) { @@ -101,7 +101,12 @@ abstract class QpidProperty<T> { return new QpidStringProperty(accessor,defaultValue, names); } - + + protected Accessor getAccessor() + { + return accessor; + } + static class QpidBooleanProperty extends QpidProperty<Boolean> { QpidBooleanProperty(Boolean defValue, String... names) @@ -117,7 +122,7 @@ abstract class QpidProperty<T> @Override protected Boolean getByName(String name) { - return accessor.getBoolean(name); + return getAccessor().getBoolean(name); } } @@ -136,7 +141,7 @@ abstract class QpidProperty<T> @Override protected Integer getByName(String name) { - return accessor.getInt(name); + return getAccessor().getInt(name); } } @@ -155,7 +160,7 @@ abstract class QpidProperty<T> @Override protected Long getByName(String name) { - return accessor.getLong(name); + return getAccessor().getLong(name); } } @@ -174,7 +179,7 @@ abstract class QpidProperty<T> @Override protected String getByName(String name) { - return accessor.getString(name); + return getAccessor().getString(name); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java index 69457ca4a9..3590254d27 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java @@ -19,11 +19,13 @@ package org.apache.qpid.dtx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQInvalidArgumentException; import javax.transaction.xa.Xid; - -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.util.Arrays; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 1989ade4ac..975ec4daca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -36,6 +36,10 @@ import org.apache.qpid.framing.AMQShortString; */ public class ExchangeDefaults { + private ExchangeDefaults() + { + } + /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */ public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java new file mode 100644 index 0000000000..47d970cfbd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java @@ -0,0 +1,272 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * An expression which performs an operation on two expression values + */ +public abstract class ArithmeticExpression extends BinaryExpression +{ + + protected static final int INTEGER = 1; + protected static final int LONG = 2; + protected static final int DOUBLE = 3; + + /** + * @param left + * @param right + */ + public ArithmeticExpression(Expression left, Expression right) + { + super(left, right); + } + + public static Expression createPlus(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof String) + { + String text = (String) lvalue; + String answer = text + rvalue; + + return answer; + } + else if (lvalue instanceof Number) + { + return plus((Number) lvalue, asNumber(rvalue)); + } + + throw new SelectorParsingException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "+"; + } + }; + } + + public static Expression createMinus(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return minus((Number) lvalue, asNumber(rvalue)); + } + + throw new SelectorParsingException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "-"; + } + }; + } + + public static Expression createMultiply(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return multiply((Number) lvalue, asNumber(rvalue)); + } + + throw new SelectorParsingException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "*"; + } + }; + } + + public static Expression createDivide(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return divide((Number) lvalue, asNumber(rvalue)); + } + + throw new SelectorParsingException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "/"; + } + }; + } + + public static Expression createMod(Expression left, Expression right) + { + return new ArithmeticExpression(left, right) + { + + protected Object evaluate(Object lvalue, Object rvalue) + { + if (lvalue instanceof Number) + { + return mod((Number) lvalue, asNumber(rvalue)); + } + + throw new SelectorParsingException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() + { + return "%"; + } + }; + } + + protected Number plus(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return Integer.valueOf(left.intValue() + right.intValue()); + + case LONG: + return Long.valueOf(left.longValue() + right.longValue()); + + default: + return Double.valueOf(left.doubleValue() + right.doubleValue()); + } + } + + protected Number minus(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return Integer.valueOf(left.intValue() - right.intValue()); + + case LONG: + return Long.valueOf(left.longValue() - right.longValue()); + + default: + return Double.valueOf(left.doubleValue() - right.doubleValue()); + } + } + + protected Number multiply(Number left, Number right) + { + switch (numberType(left, right)) + { + + case INTEGER: + return Integer.valueOf(left.intValue() * right.intValue()); + + case LONG: + return Long.valueOf(left.longValue() * right.longValue()); + + default: + return Double.valueOf(left.doubleValue() * right.doubleValue()); + } + } + + protected Number divide(Number left, Number right) + { + return Double.valueOf(left.doubleValue() / right.doubleValue()); + } + + protected Number mod(Number left, Number right) + { + return Double.valueOf(left.doubleValue() % right.doubleValue()); + } + + private int numberType(Number left, Number right) + { + if (isDouble(left) || isDouble(right)) + { + return DOUBLE; + } + else if ((left instanceof Long) || (right instanceof Long)) + { + return LONG; + } + else + { + return INTEGER; + } + } + + private boolean isDouble(Number n) + { + return (n instanceof Float) || (n instanceof Double); + } + + protected Number asNumber(Object value) + { + if (value instanceof Number) + { + return (Number) value; + } + else + { + throw new SelectorParsingException("Cannot convert value: " + value + " into a number"); + } + } + + public Object evaluate(FilterableMessage message) + { + Object lvalue = getLeft().evaluate(message); + if (lvalue == null) + { + return null; + } + + Object rvalue = getRight().evaluate(message); + if (rvalue == null) + { + return null; + } + + return evaluate(lvalue, rvalue); + } + + /** + * @param lvalue + * @param rvalue + * @return + */ + protected abstract Object evaluate(Object lvalue, Object rvalue); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/BinaryExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/BinaryExpression.java new file mode 100644 index 0000000000..6467bbbe1f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/BinaryExpression.java @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * An expression which performs an operation on two expression values. + */ +public abstract class BinaryExpression implements Expression +{ + private final Expression left; + private final Expression right; + + public BinaryExpression(Expression left, Expression right) + { + this.left = left; + this.right = right; + } + + public Expression getLeft() + { + return left; + } + + public Expression getRight() + { + return right; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + public abstract String getExpressionSymbol(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/BooleanExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/BooleanExpression.java new file mode 100644 index 0000000000..13e1604d5f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/BooleanExpression.java @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + */ +public interface BooleanExpression extends Expression +{ + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + */ + public boolean matches(FilterableMessage message); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/ComparisonExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/ComparisonExpression.java new file mode 100644 index 0000000000..2cfb97dc6c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/ComparisonExpression.java @@ -0,0 +1,596 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +/** + * A filter performing a comparison of two objects + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression +{ + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) + { + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) + { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + private static final HashSet<Character> REGEXP_CONTROL_CHARS = new HashSet<Character>(); + + static + { + REGEXP_CONTROL_CHARS.add('.'); + REGEXP_CONTROL_CHARS.add('\\'); + REGEXP_CONTROL_CHARS.add('['); + REGEXP_CONTROL_CHARS.add(']'); + REGEXP_CONTROL_CHARS.add('^'); + REGEXP_CONTROL_CHARS.add('$'); + REGEXP_CONTROL_CHARS.add('?'); + REGEXP_CONTROL_CHARS.add('*'); + REGEXP_CONTROL_CHARS.add('+'); + REGEXP_CONTROL_CHARS.add('{'); + REGEXP_CONTROL_CHARS.add('}'); + REGEXP_CONTROL_CHARS.add('|'); + REGEXP_CONTROL_CHARS.add('('); + REGEXP_CONTROL_CHARS.add(')'); + REGEXP_CONTROL_CHARS.add(':'); + REGEXP_CONTROL_CHARS.add('&'); + REGEXP_CONTROL_CHARS.add('<'); + REGEXP_CONTROL_CHARS.add('>'); + REGEXP_CONTROL_CHARS.add('='); + REGEXP_CONTROL_CHARS.add('!'); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression + { + + private Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) + { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) + { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) + { + i++; + if (i >= like.length()) + { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') + { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') + { + regexp.append("."); // match one + } + else if (REGEXP_CONTROL_CHARS.contains(c)) + { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else + { + regexp.append(c); + } + } + + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() + { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(FilterableMessage message) + { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) + { + return null; + } + + if (!(rv instanceof String)) + { + return + Boolean.FALSE; + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(FilterableMessage message) + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) + { + if ((escape != null) && (escape.length() != 1)) + { + throw new SelectorParsingException( + "The ESCAPE string literal is invalid. It can only be one character. Litteral used: " + escape); + } + + int c = -1; + if (escape != null) + { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) + { + return UnaryExpression.createNOT(createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new SelectorParsingException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) + { + + if (!(left instanceof PropertyExpression)) + { + throw new SelectorParsingException("Expected a property for In expression, got: " + left); + } + + return UnaryExpression.createInExpression((PropertyExpression) left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) + { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) + { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) + { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) + { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + + return doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) + { + return new EqualExpression(left, right); + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer > 0; + } + + public String getExpressionSymbol() + { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + protected boolean asBoolean(int answer) + { + return answer >= 0; + } + + public String getExpressionSymbol() + { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer < 0; + } + + public String getExpressionSymbol() + { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) + { + checkLessThanOperand(left); + checkLessThanOperand(right); + + return new ComparisonExpression(left, right) + { + + protected boolean asBoolean(int answer) + { + return answer <= 0; + } + + public String getExpressionSymbol() + { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value instanceof Number) + { + return; + } + + // Else it's boolean or a String.. + throw new SelectorParsingException("Value '" + expr + "' cannot be compared."); + } + + if (expr instanceof BooleanExpression) + { + throw new SelectorParsingException("Value '" + expr + "' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE literals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr) + { + if (expr instanceof ConstantExpression) + { + Object value = ((ConstantExpression) expr).getValue(); + if (value == null) + { + throw new SelectorParsingException("'" + expr + "' cannot be compared."); + } + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) + { + if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression)) + { + if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression)) + { + throw new SelectorParsingException("'" + left + "' cannot be compared with '" + right + "'"); + } + } + } + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) + { + super(left, right); + } + + public Object evaluate(FilterableMessage message) + { + Comparable lv = (Comparable) getLeft().evaluate(message); + if (lv == null) + { + return null; + } + + Comparable rv = (Comparable) getRight().evaluate(message); + if (rv == null) + { + return null; + } + + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) + { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) + { + if (lc == Byte.class) + { + if (rc == Short.class) + { + lv = ((Number) lv).shortValue(); + } + else if (rc == Integer.class) + { + lv = ((Number) lv).intValue(); + } + else if (rc == Long.class) + { + lv = ((Number) lv).longValue(); + } + else if (rc == Float.class) + { + lv = ((Number) lv).floatValue(); + } + else if (rc == Double.class) + { + lv = ((Number) lv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Short.class) + { + if (rc == Integer.class) + { + lv = ((Number) lv).intValue(); + } + else if (rc == Long.class) + { + lv = ((Number) lv).longValue(); + } + else if (rc == Float.class) + { + lv = ((Number) lv).floatValue(); + } + else if (rc == Double.class) + { + lv = ((Number) lv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Integer.class) + { + if (rc == Long.class) + { + lv = ((Number) lv).longValue(); + } + else if (rc == Float.class) + { + lv = ((Number) lv).floatValue(); + } + else if (rc == Double.class) + { + lv = ((Number) lv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Long.class) + { + if (rc == Integer.class) + { + rv = ((Number) rv).longValue(); + } + else if (rc == Float.class) + { + lv = ((Number) lv).floatValue(); + } + else if (rc == Double.class) + { + lv = ((Number) lv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Float.class) + { + if (rc == Integer.class) + { + rv = ((Number) rv).floatValue(); + } + else if (rc == Long.class) + { + rv = ((Number) rv).floatValue(); + } + else if (rc == Double.class) + { + lv = ((Number) lv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else if (lc == Double.class) + { + if (rc == Integer.class) + { + rv = ((Number) rv).doubleValue(); + } + else if (rc == Long.class) + { + rv = ((Number) rv).doubleValue(); + } + else if (rc == Float.class) + { + rv = ((Number) rv).doubleValue(); + } + else + { + return Boolean.FALSE; + } + } + else + { + return Boolean.FALSE; + } + } + + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(FilterableMessage message) + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + + private static class EqualExpression extends ComparisonExpression + { + public EqualExpression(final Expression left, final Expression right) + { + super(left, right); + } + + public Object evaluate(FilterableMessage message) + { + Object lv = getLeft().evaluate(message); + Object rv = getRight().evaluate(message); + + // Iff one of the values is null + if ((lv == null) ^ (rv == null)) + { + return Boolean.FALSE; + } + + if ((lv == rv) || lv.equals(rv)) + { + return Boolean.TRUE; + } + + if ((lv instanceof Comparable) && (rv instanceof Comparable)) + { + return compare((Comparable) lv, (Comparable) rv); + } + + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) + { + return answer == 0; + } + + public String getExpressionSymbol() + { + return "="; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java new file mode 100644 index 0000000000..20c9f1438a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/ConstantExpression.java @@ -0,0 +1,207 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.math.BigDecimal; + +/** + * Represents a constant expression + */ +public class ConstantExpression implements Expression +{ + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression + { + public BooleanConstantExpression(Object value) + { + super(value); + } + + public boolean matches(FilterableMessage message) + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) + { + + // Strip off the 'l' or 'L' if needed. + if (text.endsWith("l") || text.endsWith("L")) + { + text = text.substring(0, text.length() - 1); + } + + Number value; + try + { + value = new Long(text); + } + catch (NumberFormatException e) + { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = value.intValue(); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) + { + Number value = Long.parseLong(text.substring(2), 16); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = value.intValue(); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) + { + Number value = Long.parseLong(text, 8); + long l = value.longValue(); + if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE)) + { + value = value.intValue(); + } + + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) + { + Number value = new Double(text); + + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) + { + this.value = value; + } + + public Object evaluate(FilterableMessage message) + { + return value; + } + + public Object getValue() + { + return value; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + if (value == null) + { + return "NULL"; + } + + if (value instanceof Boolean) + { + return ((Boolean) value) ? "TRUE" : "FALSE"; + } + + if (value instanceof String) + { + return encodeString((String) value); + } + + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return toString().equals(o.toString()); + + } + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) + { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) + { + char c = s.charAt(i); + if (c == '\'') + { + b.append(c); + } + + b.append(c); + } + + b.append('\''); + + return b.toString(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/Expression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/Expression.java new file mode 100644 index 0000000000..1030c7b588 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/Expression.java @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * Represents an expression + */ +public interface Expression +{ + + /** + * @return the value of this expression + */ + public Object evaluate(FilterableMessage message); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java new file mode 100644 index 0000000000..b5b00ae70f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java @@ -0,0 +1,41 @@ +package org.apache.qpid.filter; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface FilterableMessage +{ + + boolean isPersistent(); + + boolean isRedelivered(); + + Object getHeader(String name); + + String getReplyTo(); + + String getType(); + + byte getPriority(); + + String getMessageId(); + + long getTimestamp(); + + String getCorrelationId(); + + long getExpiration(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/LogicExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/LogicExpression.java new file mode 100644 index 0000000000..f8ec19d23b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/LogicExpression.java @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +/** + * A filter performing a comparison of two objects + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression +{ + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new OrExpression(lvalue, rvalue); + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) + { + return new AndExpression(lvalue, rvalue); + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) + { + super(left, right); + } + + public abstract Object evaluate(FilterableMessage message); + + public boolean matches(FilterableMessage message) + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + + private static class OrExpression extends LogicExpression + { + public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue) + { + super(lvalue, rvalue); + } + + public Object evaluate(FilterableMessage message) + { + + Boolean lv = (Boolean) getLeft().evaluate(message); + // Can we do an OR shortcut?? + if ((lv != null) && lv.booleanValue()) + { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) getRight().evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "OR"; + } + } + + private static class AndExpression extends LogicExpression + { + public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue) + { + super(lvalue, rvalue); + } + + public Object evaluate(FilterableMessage message) + { + + Boolean lv = (Boolean) getLeft().evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + { + return null; + } + + if (!lv.booleanValue()) + { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) getRight().evaluate(message); + + return (rv == null) ? null : rv; + } + + public String getExpressionSymbol() + { + return "AND"; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/PropertyExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/PropertyExpression.java new file mode 100644 index 0000000000..4fb9c0e62f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/PropertyExpression.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; + +/** + * Represents a property expression + */ +public class PropertyExpression implements Expression +{ + // Constants - defined the same as JMS + private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT } + + private static final int DEFAULT_PRIORITY = 4; + + private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class); + + private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>(); + + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() + { + public Object evaluate(FilterableMessage message) + { + //TODO + return null; + } + }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new ReplyToExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new TypeExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new DeliveryModeExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new PriorityExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new MessageIDExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new MessageIDExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new TimestampExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new CorrelationIdExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression()); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() + { + public Object evaluate(FilterableMessage message) + { + return message.isRedelivered(); + } + }); + } + + private final String name; + private final Expression jmsPropertyExpression; + + public boolean outerTest() + { + return false; + } + + public PropertyExpression(String name) + { + this.name = name; + + + + jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name); + } + + public Object evaluate(FilterableMessage message) + { + + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } + else + { + return message.getHeader(name); + } + } + + public String getName() + { + return name; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if ((o == null) || !this.getClass().equals(o.getClass())) + { + return false; + } + + return name.equals(((PropertyExpression) o).name); + + } + + private static class ReplyToExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + String replyTo = message.getReplyTo(); + return replyTo; + } + + } + + private static class TypeExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + + String type = message.getType(); + return type; + + } + } + + private static class DeliveryModeExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + JMSDeliveryMode mode = message.isPersistent() ? JMSDeliveryMode.PERSISTENT : + JMSDeliveryMode.NON_PERSISTENT; + if (_logger.isDebugEnabled()) + { + _logger.debug("JMSDeliveryMode is :" + mode); + } + + return mode.toString(); + } + } + + private static class PriorityExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + byte priority = message.getPriority(); + return (int) priority; + } + } + + private static class MessageIDExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + + String messageId = message.getMessageId(); + + return messageId; + + } + } + + private static class TimestampExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + long timestamp = message.getTimestamp(); + return timestamp; + } + } + + private static class CorrelationIdExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + + String correlationId = message.getCorrelationId(); + + return correlationId; + } + } + + private static class ExpirationExpression implements Expression + { + public Object evaluate(FilterableMessage message) + { + long expiration = message.getExpiration(); + return expiration; + + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java new file mode 100644 index 0000000000..f08b3df155 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.filter; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class SelectorParsingException extends RuntimeException +{ + public SelectorParsingException(String s) + { + super(s); + } + + public SelectorParsingException(String message, Throwable cause) + { + super(message, cause); + } + + public SelectorParsingException(Throwable cause) + { + super(cause); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/filter/UnaryExpression.java b/qpid/java/common/src/main/java/org/apache/qpid/filter/UnaryExpression.java new file mode 100644 index 0000000000..b80b89840c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/filter/UnaryExpression.java @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> +// + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +/** + * An expression which performs an operation on two expression values + */ +public abstract class UnaryExpression implements Expression +{ + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + private Expression right; + + public static Expression createNegate(Expression left) + { + return new NegativeExpression(left); + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) + { + + // Use a HashSet if there are many elements. + Collection t; + if (elements.size() == 0) + { + t = null; + } + else if (elements.size() < 5) + { + t = elements; + } + else + { + t = new HashSet(elements); + } + + final Collection inList = t; + + return new InExpression(right, inList, not); + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression + { + public BooleanUnaryExpression(Expression left) + { + super(left); + } + + public boolean matches(FilterableMessage message) + { + Object object = evaluate(message); + + return (object != null) && (object == Boolean.TRUE); + } + } + + public static BooleanExpression createNOT(BooleanExpression left) + { + return new NotExpression(left); + } + + public static BooleanExpression createBooleanCast(Expression left) + { + return new BooleanCastExpression(left); + } + + private static Number negate(Number left) + { + Class clazz = left.getClass(); + if (clazz == Integer.class) + { + return -left.intValue(); + } + else if (clazz == Long.class) + { + return -left.longValue(); + } + else if (clazz == Float.class) + { + return -left.floatValue(); + } + else if (clazz == Double.class) + { + return -left.doubleValue(); + } + else if (clazz == BigDecimal.class) + { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal) left; + bd = bd.negate(); + + if (BD_LONG_MIN_VALUE.compareTo(bd) == 0) + { + return Long.MIN_VALUE; + } + + return bd; + } + else + { + throw new SelectorParsingException("Don't know how to negate: " + left); + } + } + + public UnaryExpression(Expression left) + { + this.right = left; + } + + public Expression getRight() + { + return right; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + return ((o != null) && this.getClass().equals(o.getClass())) && toString().equals(o.toString()); + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return symbol + */ + public abstract String getExpressionSymbol(); + + private static class NegativeExpression extends UnaryExpression + { + public NegativeExpression(final Expression left) + { + super(left); + } + + public Object evaluate(FilterableMessage message) + { + Object rvalue = getRight().evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue instanceof Number) + { + return negate((Number) rvalue); + } + + return null; + } + + public String getExpressionSymbol() + { + return "-"; + } + } + + private static class InExpression extends BooleanUnaryExpression + { + private final Collection _inList; + private final boolean _not; + + public InExpression(final PropertyExpression right, final Collection inList, final boolean not) + { + super(right); + _inList = inList; + _not = not; + } + + public Object evaluate(FilterableMessage message) + { + + Object rvalue = getRight().evaluate(message); + if (rvalue == null) + { + return null; + } + + if (rvalue.getClass() != String.class) + { + return null; + } + + if (((_inList != null) && _inList.contains(rvalue)) ^ _not) + { + return Boolean.TRUE; + } + else + { + return Boolean.FALSE; + } + + } + + public String toString() + { + StringBuilder answer = new StringBuilder(String.valueOf(getRight())); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count = 0; + for (Object o : _inList) + { + if (count != 0) + { + answer.append(", "); + } + + answer.append(o); + count++; + } + + answer.append(" )"); + + return answer.toString(); + } + + public String getExpressionSymbol() + { + if (_not) + { + return "NOT IN"; + } + else + { + return "IN"; + } + } + } + + private static class NotExpression extends BooleanUnaryExpression + { + public NotExpression(final BooleanExpression left) + { + super(left); + } + + public Object evaluate(FilterableMessage message) + { + Boolean lvalue = (Boolean) getRight().evaluate(message); + if (lvalue == null) + { + return null; + } + + return lvalue ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() + { + return "NOT"; + } + } + + private static class BooleanCastExpression extends BooleanUnaryExpression + { + public BooleanCastExpression(final Expression left) + { + super(left); + } + + public Object evaluate(FilterableMessage message) + { + Object rvalue = getRight().evaluate(message); + if (rvalue == null) + { + return null; + } + + if (!rvalue.getClass().equals(Boolean.class)) + { + return Boolean.FALSE; + } + + return ((Boolean) rvalue) ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() + { + return getRight().toString(); + } + + public String getExpressionSymbol() + { + return ""; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 363d9f1ccc..cb0c78ef37 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public interface AMQBody { public byte getFrameType(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index e77e5942e3..fd42084429 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index b6f2fb18ea..9d5e654ad0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; + import java.io.IOException; public class AMQDataBlockDecoder @@ -39,7 +39,7 @@ public class AMQDataBlockDecoder _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); } - Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); + private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); public AMQDataBlockDecoder() { } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 9b5699e8ff..238f28e73e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -22,8 +22,8 @@ package org.apache.qpid.framing; import org.apache.qpid.codec.MarkableDataInput; -import java.io.*; import java.io.DataOutput; +import java.io.IOException; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 2170ebf992..966a03605c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; public interface AMQMethodBody extends AMQBody @@ -53,10 +52,6 @@ public interface AMQMethodBody extends AMQBody public void writePayload(DataOutput buffer) throws IOException; - //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQFrame generateFrame(int channelId); public String toString(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index ec6d662726..7fe293b6b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -20,14 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInput; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.IOException; public class AMQMethodBodyFactory implements BodyFactory diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 4ff7827d7f..85870e68c5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -25,11 +25,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import java.util.*; import java.lang.ref.WeakReference; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.WeakHashMap; /** * A short string is a representation of an AMQ Short String @@ -318,7 +321,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { final int size = length(); - //buffer.setAutoExpand(true); buffer.writeByte(size); buffer.write(_data, _offset, size); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java index a07fd78c8c..94a7d127b3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java @@ -27,6 +27,10 @@ public class AMQTypeMap { public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>(); + private AMQTypeMap() + { + } + static { for(AMQType type : AMQType.values()) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index f64164c10b..c4dc86bf11 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.framing; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; import java.util.Date; import java.util.Map; -import java.math.BigDecimal; /** * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 2739f7d14b..eb528159c0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.framing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BasicContentHeaderProperties implements CommonContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -89,7 +88,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public int getPropertyListSize() { - if(_encodedForm != null && (_headers == null || _headers.isClean())) + if(useEncodedForm()) { return _encodedForm.length; } @@ -190,7 +189,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void writePropertyListPayload(DataOutput buffer) throws IOException { - if(_encodedForm != null && (_headers == null || !_headers.isClean())) + if(useEncodedForm()) { buffer.write(_encodedForm); } @@ -295,85 +294,83 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException { - // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int headersOffset = 0; - int headersOffset = 0; - - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - _contentType = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_contentType); - } + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + _contentType = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_contentType); + } - if ((_propertyFlags & ENCODING_MASK) != 0) - { - _encoding = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_encoding); - } + if ((_propertyFlags & ENCODING_MASK) != 0) + { + _encoding = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_encoding); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - long length = EncodingUtils.readUnsignedInteger(buffer); + if ((_propertyFlags & HEADERS_MASK) != 0) + { + long length = EncodingUtils.readUnsignedInteger(buffer); - _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); + _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); - buffer.skipBytes((int)length); - } + buffer.skipBytes((int)length); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - _deliveryMode = buffer.readByte(); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + _deliveryMode = buffer.readByte(); + } - if ((_propertyFlags & PRIORITY_MASK) != 0) - { - _priority = buffer.readByte(); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + _priority = buffer.readByte(); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - _correlationId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + _correlationId = buffer.readAMQShortString(); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) - { - _replyTo = buffer.readAMQShortString(); - } + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + _replyTo = buffer.readAMQShortString(); + } - if ((_propertyFlags & EXPIRATION_MASK) != 0) - { - _expiration = EncodingUtils.readLongAsShortString(buffer); - } + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + _expiration = EncodingUtils.readLongAsShortString(buffer); + } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - _messageId = buffer.readAMQShortString(); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + _messageId = buffer.readAMQShortString(); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - _timestamp = EncodingUtils.readTimestamp(buffer); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - _type = buffer.readAMQShortString(); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + _type = buffer.readAMQShortString(); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - _userId = buffer.readAMQShortString(); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + _userId = buffer.readAMQShortString(); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - _appId = buffer.readAMQShortString(); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + _appId = buffer.readAMQShortString(); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - _clusterId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + _clusterId = buffer.readAMQShortString(); + } } @@ -655,4 +652,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti + _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = " + _timestamp + ",JMSType = " + _type; } + private boolean useEncodedForm() + { + return _encodedForm != null && (_headers == null || _headers.isClean()); + } + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java index 656185629b..196ab422a3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java @@ -2,8 +2,6 @@ package org.apache.qpid.framing; import org.apache.qpid.codec.MarkableDataInput; -import java.io.IOException; - public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput { private byte[] _data; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 541d104dc9..6d6ec708d0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public byte[] _payload; + private byte[] _payload; public ContentBody() { @@ -42,7 +42,7 @@ public class ContentBody implements AMQBody public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { _payload = new byte[(int)size]; - buffer.readFully(_payload); + buffer.readFully(getPayload()); } @@ -58,12 +58,12 @@ public class ContentBody implements AMQBody public int getSize() { - return _payload == null ? 0 : _payload.length; + return getPayload() == null ? 0 : getPayload().length; } public void writePayload(DataOutput buffer) throws IOException { - buffer.write(_payload); + buffer.write(getPayload()); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -77,7 +77,7 @@ public class ContentBody implements AMQBody if (size > 0) { _payload = new byte[(int)size]; - buffer.read(_payload); + buffer.read(getPayload()); } } @@ -86,6 +86,11 @@ public class ContentBody implements AMQBody { } + public byte[] getPayload() + { + return _payload; + } + private static class BufferContentBody implements AMQBody { private final int _length; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index de2ffe9755..10df105ee6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.IOException; + public class ContentBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 8a2ad53157..f6fa89a91c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,24 +20,23 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; - public int classId; + private int classId; - public int weight; + private int weight; - /** unsigned long but java can't handle that anyway when allocating byte array */ - public long bodySize; + private long bodySize; /** must never be null */ private ContentHeaderProperties properties; @@ -76,17 +75,6 @@ public class ContentHeaderBody implements AMQBody return TYPE; } - protected void populateFromBuffer(DataInputStream buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException, IOException - { - classId = buffer.readUnsignedShort(); - weight = buffer.readUnsignedShort(); - bodySize = buffer.readLong(); - int propertyFlags = buffer.readUnsignedShort(); - ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); - properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); - } - /** * Helper method that is used currently by the persistence layer (by BDB at the moment). * @param buffer @@ -153,4 +141,25 @@ public class ContentHeaderBody implements AMQBody ", properties=" + properties + '}'; } + + public int getClassId() + { + return classId; + } + + public int getWeight() + { + return weight; + } + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long getBodySize() + { + return bodySize; + } + + public void setBodySize(long bodySize) + { + this.bodySize = bodySize; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index c3e4c69ec0..83a5211013 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.IOException; + public class ContentHeaderBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index ea8358a538..2e1b988aa3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 48bd52858d..ff97c0b28f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; - public class ContentHeaderPropertiesFactory { private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index e018407509..1ecd8a13b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import java.io.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -40,6 +41,10 @@ public class EncodingUtils public static final int SIZEOF_UNSIGNED_INT = 4; private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; + private EncodingUtils() + { + } + public static int encodedShortStringLength(String s) { if (s == null) @@ -114,7 +119,7 @@ public class EncodingUtils { return len + 6 + encodedShortStringLength((short) (i / 1000000)); } - else // if (i > 99999) + else // if i > 99999 { return len + 5 + encodedShortStringLength((short) (i / 100000)); } @@ -259,7 +264,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException { - assert (s == null) || (s.length() <= 0xFFFE); if (s != null) { int len = s.length(); @@ -281,7 +285,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException { - assert (s == null) || (s.length <= 0xFFFE); if (s != null) { int len = s.length; @@ -302,7 +305,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException { - assert (bytes == null) || (bytes.length <= 0xFFFE); if (bytes != null) { writeUnsignedInteger(buffer, bytes.length); @@ -736,8 +738,6 @@ public class EncodingUtils public static long readTimestamp(DataInput buffer) throws IOException { - // Discard msb from AMQ timestamp - // buffer.getUnsignedInt(); return buffer.readLong(); } @@ -802,8 +802,6 @@ public class EncodingUtils byte[] from = new byte[size]; - // Is this not the same. - // bb.get(from, 0, length); for (int i = 0; i < size; i++) { from[i] = bb.get(i); @@ -958,7 +956,6 @@ public class EncodingUtils else { // really writing out unsigned byte - //buffer.put((byte) 0); writeUnsignedInteger(buffer, 0L); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 863e363b87..57f2c638a2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -25,7 +25,11 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index af0c5b845c..a2d4d27396 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -21,11 +21,14 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; public class FieldTableFactory { + private FieldTableFactory() + { + } + public static FieldTable newFieldTable() { return new FieldTable(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 95b6246717..1613cd055e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 2925724dc2..0a06f0f1e9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -23,8 +23,9 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; -import java.io.*; - +import java.io.DataOutput; +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.Arrays; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock @@ -36,14 +37,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData private static final byte CURRENT_PROTOCOL_CLASS = 1; private static final byte TCP_PROTOCOL_INSTANCE = 1; - public final byte[] _protocolHeader; - public final byte _protocolClass; - public final byte _protocolInstance; - public final byte _protocolMajor; - public final byte _protocolMinor; - - -// public ProtocolInitiation() {} + private final byte[] _protocolHeader; + private final byte _protocolClass; + private final byte _protocolInstance; + private final byte _protocolMajor; + private final byte _protocolMinor; public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) { @@ -206,6 +204,26 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData return pv; } + public byte getProtocolClass() + { + return _protocolClass; + } + + public byte getProtocolInstance() + { + return _protocolInstance; + } + + public byte getProtocolMajor() + { + return _protocolMajor; + } + + public byte getProtocolMinor() + { + return _protocolMinor; + } + public String toString() { StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java deleted file mode 100644 index dd854dd498..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing; - -import java.io.DataOutput; -import java.io.IOException; - -public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock -{ - private AMQDataBlock _firstFrame; - - private AMQDataBlock _block; - - public SmallCompositeAMQDataBlock(AMQDataBlock block) - { - _block = block; - } - - /** - * The encoded block will be logically first before the AMQDataBlocks which are encoded - * into the buffer afterwards. - * @param encodedBlock already-encoded data - * @param block a block to be encoded. - */ - public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block) - { - this(block); - _firstFrame = encodedBlock; - } - - public AMQDataBlock getBlock() - { - return _block; - } - - public AMQDataBlock getFirstFrame() - { - return _firstFrame; - } - - public long getSize() - { - long frameSize = _block.getSize(); - - if (_firstFrame != null) - { - - frameSize += _firstFrame.getSize(); - } - return frameSize; - } - - public void writePayload(DataOutput buffer) throws IOException - { - if (_firstFrame != null) - { - _firstFrame.writePayload(buffer); - } - _block.writePayload(buffer); - - } - - public String toString() - { - if (_block == null) - { - return "No blocks contained in composite frame"; - } - else - { - StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_firstFrame); - - buf.append(" _block=[").append(_block.toString()).append("]"); - - buf.append("}"); - return buf.toString(); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java deleted file mode 100644 index e770fdd7e4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VersionSpecificRegistry -{ - private static final Logger _log = LoggerFactory.getLogger(VersionSpecificRegistry.class); - - private final byte _protocolMajorVersion; - private final byte _protocolMinorVersion; - - private static final int DEFAULT_MAX_CLASS_ID = 200; - private static final int DEFAULT_MAX_METHOD_ID = 50; - - private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; - - private ProtocolVersionMethodConverter _protocolVersionConverter; - - public VersionSpecificRegistry(byte major, byte minor) - { - _protocolMajorVersion = major; - _protocolMinorVersion = minor; - - _protocolVersionConverter = loadProtocolVersionConverters(major, minor); - } - - private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, - byte protocolMinorVersion) - { - try - { - Class<ProtocolVersionMethodConverter> versionMethodConverterClass = - (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_" - + protocolMajorVersion + "_" + protocolMinorVersion); - - return versionMethodConverterClass.newInstance(); - - } - catch (ClassNotFoundException e) - { - _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); - if (protocolMinorVersion != 0) - { - protocolMinorVersion--; - - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); - } - else if (protocolMajorVersion != 0) - { - protocolMajorVersion--; - - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); - } - else - { - return null; - } - - } - catch (IllegalAccessException e) - { - throw new IllegalStateException("Unable to load protocol version converter: ", e); - } - catch (InstantiationException e) - { - throw new IllegalStateException("Unable to load protocol version converter: ", e); - } - } - - public byte getProtocolMajorVersion() - { - return _protocolMajorVersion; - } - - public byte getProtocolMinorVersion() - { - return _protocolMinorVersion; - } - - public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID) - { - try - { - return _registry[classID][methodID]; - } - catch (IndexOutOfBoundsException e) - { - return null; - } - catch (NullPointerException e) - { - return null; - } - } - - public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory) - { - if (_registry.length <= classID) - { - AMQMethodBodyInstanceFactory[][] oldRegistry = _registry; - _registry = new AMQMethodBodyInstanceFactory[classID + 1][]; - System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length); - } - - if (_registry[classID] == null) - { - _registry[classID] = - new AMQMethodBodyInstanceFactory[(methodID > DEFAULT_MAX_METHOD_ID) ? (methodID + 1) - : (DEFAULT_MAX_METHOD_ID + 1)]; - } - else if (_registry[classID].length <= methodID) - { - AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID]; - _registry[classID] = new AMQMethodBodyInstanceFactory[methodID + 1]; - System.arraycopy(oldMethods, 0, _registry[classID], 0, oldMethods.length); - } - - _registry[classID][methodID] = instanceFactory; - - } - - public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException - { - AMQMethodBodyInstanceFactory bodyFactory; - try - { - bodyFactory = _registry[classID][methodID]; - } - catch (NullPointerException e) - { - throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - } - catch (IndexOutOfBoundsException e) - { - if (classID >= _registry.length) - { - throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - - } - else - { - throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - - } - } - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", null); - } - - return bodyFactory.newInstance( in, size); - - } - - public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolVersionConverter; - } - - public void configure() - { - _protocolVersionConverter.configure(); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java index e3d5da73da..53c70c8d71 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.framing.abstraction; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.AMQShortString; public class MessagePublishInfoImpl implements MessagePublishInfo diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index 90a730d6f7..b3eb1211a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_0_9; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter @@ -115,7 +119,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot public byte[] getData() { - return _contentBodyChunk._payload; + return _contentBodyChunk.getPayload(); } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java index 3b0cc3cebc..d33749d795 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_0_91; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -114,7 +118,7 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro public byte[] getData() { - return _contentBodyChunk._payload; + return _contentBodyChunk.getPayload(); } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index e6d0482f0d..4c7772a3a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_8_0; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -59,7 +63,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot public byte[] getData() { - return contentBodyChunk._payload; + return contentBodyChunk.getPayload(); } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java index d9c12148cb..bccae8e671 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.messaging; -import java.util.Map; - import org.apache.qpid.messaging.util.AddressParser; import static org.apache.qpid.messaging.util.PyPrint.pprint; +import java.util.Map; + /** * Address diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressParser.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressParser.java index 7b31436ba0..d1e10607ac 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressParser.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressParser.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.messaging.util; +import org.apache.qpid.messaging.Address; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.messaging.Address; - /** * AddressParser @@ -257,6 +257,10 @@ public class AddressParser extends Parser { eat(SLASH); subject = toks2str(eat_until(SEMI, EOF)); + if ("None".equals(subject)) + { + subject = null; + } } else { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java index 93df052af1..c5c2cacd1a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java @@ -20,16 +20,13 @@ */ package org.apache.qpid.messaging.util; -import java.io.InputStreamReader; - -import java.util.List; - import org.apache.qpid.messaging.Address; -import org.apache.qpid.messaging.util.ParseError; -import org.apache.qpid.messaging.util.Token; import static org.apache.qpid.messaging.util.PyPrint.pprint; +import java.io.InputStreamReader; +import java.util.List; + /** * JAddr @@ -38,6 +35,9 @@ import static org.apache.qpid.messaging.util.PyPrint.pprint; public class JAddr { + private JAddr() + { + } public static final void main(String[] args) throws Exception { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/Lexer.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/Lexer.java index 8226cc77cb..9638ec78e2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/Lexer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/Lexer.java @@ -21,9 +21,7 @@ package org.apache.qpid.messaging.util; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java index ef6c724371..2681893482 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java @@ -33,6 +33,9 @@ import java.util.Map; public class PyPrint { + private PyPrint() + { + } public static String pprint(Object obj) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 3e99b244c4..0df9a85676 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,10 +22,10 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.LinkedBlockingQueue; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java new file mode 100644 index 0000000000..15c144b0eb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.properties; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; + +import org.apache.qpid.transport.util.Logger; + +/** + * Constants for the various properties 0-10 clients can + * set values for during the ConnectionStartOk reply. + */ +public class ConnectionStartProperties +{ + private static final Logger LOGGER = Logger.get(ConnectionStartProperties.class); + + public static final String CLIENT_ID_0_10 = "clientName"; + public static final String CLIENT_ID_0_8 = "instance"; + + public static final String VERSION_0_8 = "version"; + public static final String VERSION_0_10 = "qpid.client_version"; + + public static final String PROCESS = "qpid.client_process"; + + public static final String PID = "qpid.client_pid"; + + public static final String PLATFORM = "platform"; + + public static final String PRODUCT ="product"; + + public static final String SESSION_FLOW = "qpid.session_flow"; + + public static int getPID() + { + RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); + String processName = rtb.getName(); + if (processName != null && processName.indexOf('@') > 0) + { + try + { + return Integer.parseInt(processName.substring(0,processName.indexOf('@'))); + } + catch(Exception e) + { + LOGGER.warn("Unable to get the PID due to error",e); + return -1; + } + } + else + { + LOGGER.warn("Unable to get the PID due to unsupported format : " + processName); + return -1; + } + } + + public static String getPlatformInfo() + { + StringBuilder fullSystemInfo = new StringBuilder(System.getProperty("java.runtime.name")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("java.runtime.version")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("java.vendor")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("os.arch")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("os.name")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("os.version")); + fullSystemInfo.append(", "); + fullSystemInfo.append(System.getProperty("sun.os.patch.level")); + + return fullSystemInfo.toString(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 14d1befaf1..2a2342ca14 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.AMQShortString; + import java.util.HashMap; import java.util.Map; -import org.apache.qpid.framing.AMQShortString; - /** * Defines constants for AMQP codes and also acts as a factory for creating such constants from the raw codes. Each * constant also defines a short human readable description of the constant. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 5a7679a972..8cc9709c9c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; /** * AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index b58e7d01dc..185c01d3df 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,9 +20,13 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.*; -import org.apache.qpid.transport.Sender; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.transport.Sender; import java.nio.ByteBuffer; @@ -47,7 +51,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto * * @return The method registry for a specific version of the AMQP. */ -// public VersionSpecificRegistry getRegistry(); MethodRegistry getMethodRegistry(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index fd651a2b66..7ca588946b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.protocol; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 7378edff0c..885a6a975d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.transport.network.NetworkConnection; - -public interface ProtocolEngineFactory +public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java index 01f13408b0..c9ff180c54 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.ssl; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.security.KeyStore; +import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; - -import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; /** * Factory used to create SSLContexts. SSL needs to be configured @@ -42,7 +41,6 @@ public class SSLContextFactory { public static final String JAVA_KEY_STORE_CODE = "JKS"; public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS"; - public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509"; private SSLContextFactory() { @@ -50,28 +48,28 @@ public class SSLContextFactory } public static SSLContext buildServerContext(final String keyStorePath, - final String keyStorePassword, final String keyStoreCertType) + final String keyStorePassword, final String keyManagerFactoryAlgorithm) throws GeneralSecurityException, IOException { return buildContext(null, null, null, keyStorePath, keyStorePassword, - keyStoreCertType, null); + keyManagerFactoryAlgorithm, null); } public static SSLContext buildClientContext(final String trustStorePath, - final String trustStorePassword, final String trustStoreCertType, + final String trustStorePassword, final String trustManagerFactoryAlgorithm, final String keyStorePath, final String keyStorePassword, - final String keyStoreCertType, final String certAlias) + final String keyManagerFactoryAlgorithm, final String certAlias) throws GeneralSecurityException, IOException { return buildContext(trustStorePath, trustStorePassword, - trustStoreCertType, keyStorePath, keyStorePassword, - keyStoreCertType, certAlias); + trustManagerFactoryAlgorithm, keyStorePath, keyStorePassword, + keyManagerFactoryAlgorithm, certAlias); } private static SSLContext buildContext(final String trustStorePath, - final String trustStorePassword, final String trustStoreCertType, + final String trustStorePassword, final String trustManagerFactoryAlgorithm, final String keyStorePath, final String keyStorePassword, - final String keyStoreCertType, final String certAlias) + final String keyManagerFactoryAlgorithm, final String certAlias) throws GeneralSecurityException, IOException { // Initialize the SSLContext to work with our key managers. @@ -86,7 +84,7 @@ public class SSLContextFactory final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath, trustStorePassword); final TrustManagerFactory tmf = TrustManagerFactory - .getInstance(trustStoreCertType); + .getInstance(trustManagerFactoryAlgorithm); tmf.init(ts); trustManagers = tmf.getTrustManagers(); @@ -102,7 +100,7 @@ public class SSLContextFactory { keyManagers = new KeyManager[] { new QpidClientX509KeyManager( certAlias, keyStorePath, keyStorePassword, - keyStoreCertType) }; + keyManagerFactoryAlgorithm) }; } else { @@ -112,7 +110,7 @@ public class SSLContextFactory char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray(); // Set up key manager factory to use our key store final KeyManagerFactory kmf = KeyManagerFactory - .getInstance(keyStoreCertType); + .getInstance(keyManagerFactoryAlgorithm); kmf.init(ks, keyStoreCharPassword); keyManagers = kmf.getKeyManagers(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java index 192675edcd..58d80454b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.thread; -import java.lang.Thread.UncaughtExceptionHandler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.Thread.UncaughtExceptionHandler; + /** * * An {@link UncaughtExceptionHandler} that writes the exception to the application log via diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java index 30010a2d89..06f5e6b835 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java @@ -21,8 +21,6 @@ package org.apache.qpid.thread; -import org.apache.qpid.thread.Threading; - import java.util.concurrent.Executor; public class QpidThreadExecutor implements Executor diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java index 603e8a7441..265b336157 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -24,7 +24,11 @@ package org.apache.qpid.thread; public final class Threading { private static ThreadFactory threadFactory; - + + private Threading() + { + } + static { try { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java index 491a7ac218..e3e6f81f7c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.transport; -import java.nio.ByteBuffer; +import static org.apache.qpid.transport.util.Functions.str; -import static org.apache.qpid.transport.util.Functions.*; +import java.nio.ByteBuffer; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 9bdad6b00e..c75adab444 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.transport.util.Logger; + import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.RESUMING; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import org.apache.qpid.transport.util.Logger; - /** * ClientDelegate @@ -46,11 +46,11 @@ public class ClientDelegate extends ConnectionDelegate - protected final ConnectionSettings _conSettings; + private final ConnectionSettings _connectionSettings; public ClientDelegate(ConnectionSettings settings) { - this._conSettings = settings; + _connectionSettings = settings; } public void init(Connection conn, ProtocolHeader hdr) @@ -66,15 +66,17 @@ public class ClientDelegate extends ConnectionDelegate { Map<String,Object> clientProperties = new HashMap<String,Object>(); - if(this._conSettings.getClientProperties() != null) + if(_connectionSettings.getClientProperties() != null) { - clientProperties.putAll(_conSettings.getClientProperties()); + clientProperties.putAll(_connectionSettings.getClientProperties()); } - clientProperties.put("qpid.session_flow", 1); - clientProperties.put("qpid.client_pid",getPID()); - clientProperties.put("qpid.client_process", - System.getProperty("qpid.client_process","Qpid Java Client")); + clientProperties.put(ConnectionStartProperties.SESSION_FLOW, 1); + clientProperties.put(ConnectionStartProperties.PID, ConnectionStartProperties.getPID()); + clientProperties.put(ConnectionStartProperties.PROCESS, System.getProperty(ClientProperties.PROCESS_NAME, "Qpid Java Client")); + clientProperties.put(ConnectionStartProperties.VERSION_0_10, QpidProperties.getReleaseVersion()); + clientProperties.put(ConnectionStartProperties.PRODUCT, QpidProperties.getProductName()); + clientProperties.put(ConnectionStartProperties.PLATFORM, ConnectionStartProperties.getPlatformInfo()); List<Object> brokerMechs = start.getMechanisms(); if (brokerMechs == null || brokerMechs.isEmpty()) @@ -131,7 +133,7 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { - int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(), + int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() ); @@ -146,7 +148,7 @@ public class ClientDelegate extends ConnectionDelegate //(or that forced by protocol limitations [0xFFFF]) conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); - conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST); + conn.connectionOpen(_connectionSettings.getVhost(), null, Option.INSIST); } @Override @@ -197,31 +199,8 @@ public class ClientDelegate extends ConnectionDelegate } } - private int getPID() + public ConnectionSettings getConnectionSettings() { - RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); - String processName = rtb.getName(); - if (processName != null && processName.indexOf('@')>0) - { - try - { - return Integer.parseInt(processName.substring(0,processName.indexOf('@'))); - } - catch(Exception e) - { - log.warn("Unable to get the client PID due to error",e); - return -1; - } - } - else - { - log.warn("Unable to get the client PID due to unsupported format : " + processName); - return -1; - } - + return _connectionSettings; } - - - - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index dee5f696b9..b0f1a1bad8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,12 +20,27 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.network.security.SecurityLayerFactory; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; +import org.apache.qpid.util.Strings; + import static org.apache.qpid.transport.Connection.State.CLOSED; import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -36,22 +51,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslServer; - -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.security.SecurityLayer; -import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; -import org.apache.qpid.util.Strings; - /** * Connection @@ -125,7 +124,6 @@ public class Connection extends ConnectionInvoker private String userID; private ConnectionSettings conSettings; private SecurityLayer securityLayer; - private String _clientId; private final AtomicBoolean connectionLost = new AtomicBoolean(false); @@ -161,16 +159,6 @@ public class Connection extends ConnectionInvoker } } - public String getClientId() - { - return _clientId; - } - - public void setClientId(String id) - { - _clientId = id; - } - void setLocale(String locale) { this.locale = locale; @@ -201,23 +189,12 @@ public class Connection extends ConnectionInvoker return saslClient; } - public void connect(String host, int port, String vhost, String username, String password) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs) { - connect(host, port, vhost, username, password, false); + connect(host, port, vhost, username, password, ssl, saslMechs, null); } - public void connect(String host, int port, String vhost, String username, String password, boolean ssl) - { - connect(host, port, vhost, username, password, ssl,"PLAIN"); - } - - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) - { - connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP); - } - - - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, Map<String,Object> clientProps) { ConnectionSettings settings = new ConnectionSettings(); settings.setHost(host); @@ -535,7 +512,7 @@ public class Connection extends ConnectionInvoker exception(new ConnectionException(t)); } - void closeCode(ConnectionClose close) + public void closeCode(ConnectionClose close) { synchronized (lock) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 393301659d..fdd35d49ef 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -22,7 +22,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.util.Logger; -import static org.apache.qpid.transport.Connection.State.*; +import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD; /** @@ -71,12 +71,17 @@ public abstract class ConnectionDelegate @Override public void connectionClose(Connection conn, ConnectionClose close) { - conn.connectionCloseOk(); - conn.getSender().close(); + sendConnectionCloseOkAndCloseSender(conn); conn.closeCode(close); conn.setState(CLOSE_RCVD); } + protected void sendConnectionCloseOkAndCloseSender(Connection conn) + { + conn.connectionCloseOk(); + conn.getSender().close(); + } + @Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok) { conn.getSender().close(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 2ee507e2ec..084428d182 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -20,9 +20,24 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_TCP_NODELAY_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.RECEIVE_BUFFER_SIZE_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.SEND_BUFFER_SIZE_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.LEGACY_SEND_BUFFER_SIZE_PROP_NAME; + import java.util.Map; -import org.apache.qpid.configuration.ClientProperties; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.qpid.configuration.QpidProperty; + /** * A ConnectionSettings object can only be associated with @@ -34,37 +49,36 @@ public class ConnectionSettings { public static final String WILDCARD_ADDRESS = "*"; - String protocol = "tcp"; - String host = "localhost"; - String vhost; - String username = "guest"; - String password = "guest"; - int port = 5672; - boolean tcpNodelay = Boolean.valueOf(System.getProperty(ClientProperties.QPID_TCP_NODELAY_PROP_NAME, - System.getProperty(ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME, "true"))); - int maxChannelCount = 32767; - int maxFrameSize = 65535; - int heartbeatInterval; - int readBufferSize = 65535; - int writeBufferSize = 65535; - long transportTimeout = 60000; - + private String protocol = "tcp"; + private String host = "localhost"; + private String vhost; + private String username = "guest"; + private String password = "guest"; + private int port = 5672; + private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get(); + private int maxChannelCount = 32767; + private int maxFrameSize = 65535; + private int heartbeatInterval; + private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get(); + private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();; + private long transportTimeout = 60000; + // SSL props - boolean useSSL; - String keyStorePath = System.getProperty("javax.net.ssl.keyStore"); - String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword"); - String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");; - String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");; - String trustStorePath = System.getProperty("javax.net.ssl.trustStore");; - String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");; - String certAlias; - boolean verifyHostname; + private boolean useSSL; + private String keyStorePath = System.getProperty("javax.net.ssl.keyStore"); + private String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword"); + private String keyManagerFactoryAlgorithm = QpidProperty.stringProperty(KeyManagerFactory.getDefaultAlgorithm(), QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME).get(); + private String trustManagerFactoryAlgorithm = QpidProperty.stringProperty(TrustManagerFactory.getDefaultAlgorithm(), QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME).get(); + private String trustStorePath = System.getProperty("javax.net.ssl.trustStore");; + private String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");; + private String certAlias; + private boolean verifyHostname; // SASL props - String saslMechs = System.getProperty("qpid.sasl_mechs", null); - String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP"); - String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost"); - boolean useSASLEncryption; + private String saslMechs = System.getProperty("qpid.sasl_mechs", null); + private String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP"); + private String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost"); + private boolean useSASLEncryption; private Map<String, Object> _clientProperties; @@ -288,24 +302,24 @@ public class ConnectionSettings this.verifyHostname = verifyHostname; } - public String getKeyStoreCertType() + public String getKeyManagerFactoryAlgorithm() { - return keyStoreCertType; + return keyManagerFactoryAlgorithm; } - public void setKeyStoreCertType(String keyStoreCertType) + public void setKeyManagerFactoryAlgorithm(String keyManagerFactoryAlgorithm) { - this.keyStoreCertType = keyStoreCertType; + this.keyManagerFactoryAlgorithm = keyManagerFactoryAlgorithm; } - public String getTrustStoreCertType() + public String getTrustManagerFactoryAlgorithm() { - return trustStoreCertType; + return trustManagerFactoryAlgorithm; } - public void setTrustStoreCertType(String trustStoreCertType) + public void setTrustManagerFactoryAlgorithm(String trustManagerFactoryAlgorithm) { - this.trustStoreCertType = trustStoreCertType; + this.trustManagerFactoryAlgorithm = trustManagerFactoryAlgorithm; } public int getReadBufferSize() @@ -337,5 +351,4 @@ public class ConnectionSettings { this.transportTimeout = transportTimeout; } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java index 543856ca39..7ac75f9163 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport; -import java.util.*; +import java.util.List; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index abf96823cc..b3af99ea9d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -22,9 +22,9 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; -import java.nio.ByteBuffer; +import static org.apache.qpid.transport.util.Functions.str; -import static org.apache.qpid.transport.util.Functions.*; +import java.nio.ByteBuffer; /** * Method diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java index 8d3f7a779a..472beb6bb1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.transport; +import java.net.InetSocketAddress; + /** * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned @@ -43,4 +45,6 @@ public interface NetworkTransportConfiguration String getTransport(); Integer getConnectorProcessors(); + + InetSocketAddress getAddress(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java deleted file mode 100644 index 68fbb5e8ec..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport; - -import java.io.IOException; - -public class OpenException extends IOException -{ - - public OpenException(String string, Throwable lastException) - { - super(string, lastException); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java index e5b93e40a9..3959dc8d95 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.transport; -import java.nio.ByteBuffer; - +import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.network.NetworkDelegate; import org.apache.qpid.transport.network.NetworkEvent; -import org.apache.qpid.transport.network.Frame; + +import java.nio.ByteBuffer; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java index c47171dc4b..413ec8e8fd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java @@ -20,13 +20,16 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.util.Serial.gt; +import static org.apache.qpid.util.Serial.le; +import static org.apache.qpid.util.Serial.max; +import static org.apache.qpid.util.Serial.min; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import static org.apache.qpid.util.Serial.*; - /** * Range @@ -119,6 +122,11 @@ public abstract class Range implements RangeSet throw new UnsupportedOperationException(); } + public void subtract(RangeSet rangeSet) + { + throw new UnsupportedOperationException(); + } + public RangeSet copy() { RangeSet rangeSet = RangeSetFactory.createRangeSet(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index 34ebd02777..19990a4610 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.transport; -import java.util.*; - -import static org.apache.qpid.util.Serial.*; +import java.util.Iterator; /** * RangeSet @@ -51,6 +49,8 @@ public interface RangeSet extends Iterable<Range> void add(int value); + void subtract(final RangeSet other); + void clear(); RangeSet copy(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java index 0f19d7e2b2..0f049aba8e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java @@ -22,6 +22,10 @@ package org.apache.qpid.transport; public class RangeSetFactory { + private RangeSetFactory() + { + } + public static RangeSet createRangeSet() { return new RangeSetImpl(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java index f2540afb40..adf18e2920 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.util.Serial.lt; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.ListIterator; -import static org.apache.qpid.util.Serial.lt; - public class RangeSetImpl implements RangeSet { @@ -150,6 +150,68 @@ public class RangeSetImpl implements RangeSet ranges.clear(); } + public void subtract(final RangeSet other) + { + final Iterator<Range> otherIter = other.iterator() ; + if (otherIter.hasNext()) + { + Range otherRange = otherIter.next(); + final ListIterator<Range> iter = ranges.listIterator() ; + if (iter.hasNext()) + { + Range range = iter.next(); + do + { + if (otherRange.getUpper() < range.getLower()) + { + otherRange = nextRange(otherIter) ; + } + else if (range.getUpper() < otherRange.getLower()) + { + range = nextRange(iter) ; + } + else + { + final boolean first = range.getLower() < otherRange.getLower() ; + final boolean second = otherRange.getUpper() < range.getUpper() ; + + if (first) + { + iter.set(Range.newInstance(range.getLower(), otherRange.getLower()-1)) ; + if (second) + { + iter.add(Range.newInstance(otherRange.getUpper()+1, range.getUpper())) ; + iter.previous() ; + range = iter.next() ; + } + else + { + range = nextRange(iter) ; + } + } + else if (second) + { + range = Range.newInstance(otherRange.getUpper()+1, range.getUpper()) ; + iter.set(range) ; + otherRange = nextRange(otherIter) ; + } + else + { + iter.remove() ; + range = nextRange(iter) ; + } + } + } + while ((otherRange != null) && (range != null)) ; + } + } + } + + private Range nextRange(final Iterator<Range> iter) + { + return (iter.hasNext() ? iter.next() : null) ; + } + public RangeSet copy() { return new org.apache.qpid.transport.RangeSetImpl(this); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 07d21c9904..ec409d1c72 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.transport; -import static org.apache.qpid.transport.Connection.State.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import static org.apache.qpid.transport.Connection.State.OPEN; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * ServerDelegate @@ -70,9 +69,6 @@ public class ServerDelegate extends ConnectionDelegate conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); - String clientName = (String) ok.getClientProperties().get("clientName"); - conn.setClientId(clientName); - if (mechanism == null || mechanism.length() == 0) { tuneAuthorizedConnection(conn); @@ -195,17 +191,11 @@ public class ServerDelegate extends ConnectionDelegate @Override public void sessionAttach(Connection conn, SessionAttach atc) { - sessionAttachImpl(conn, atc); - } - - protected Session sessionAttachImpl(Connection conn, SessionAttach atc) - { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); + conn.registerSession(ssn); ssn.sessionAttached(atc.getName()); ssn.setState(Session.State.OPEN); - - return ssn; } protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 3fc596d0eb..110c73f718 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -21,6 +21,11 @@ package org.apache.qpid.transport; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; + import static org.apache.qpid.transport.Option.COMPLETED; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.TIMELY_REPLY; @@ -30,11 +35,6 @@ import static org.apache.qpid.transport.Session.State.DETACHED; import static org.apache.qpid.transport.Session.State.NEW; import static org.apache.qpid.transport.Session.State.OPEN; import static org.apache.qpid.transport.Session.State.RESUMING; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.transport.network.Frame; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; import static org.apache.qpid.util.Serial.ge; import static org.apache.qpid.util.Serial.gt; import static org.apache.qpid.util.Serial.le; @@ -161,7 +161,7 @@ public class Session extends SessionInvoker this.expiry = expiry; } - void setClose(boolean close) + protected void setClose(boolean close) { this.closing = close; } @@ -410,7 +410,6 @@ public class Session extends SessionInvoker log.debug("ID: [%s] %s", this.channel, id); } - //if ((id % 65536) == 0) if ((id & 0xff) == 0) { flushProcessed(TIMELY_REPLY); @@ -514,20 +513,12 @@ public class Session extends SessionInvoker void knownComplete(RangeSet kc) { - synchronized (processedLock) + if (kc.size() > 0) { - RangeSet newProcessed = RangeSetFactory.createRangeSet(); - for (Range pr : processed) + synchronized (processedLock) { - for (Range kr : kc) - { - for (Range r : pr.subtract(kr)) - { - newProcessed.add(r); - } - } + processed.subtract(kc) ; } - this.processed = newProcessed; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java index 64f9039484..01fe05c851 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.transport; -import java.util.Collections; - - /** * SessionClosedException * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java index c4fc9558a1..6095b892f8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.transport; -import java.util.List; - /** * SessionException * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java index 22bd9f34ad..9b703a3117 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Struct.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.transport; -import java.util.List; -import java.util.Map; - import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encodable; import org.apache.qpid.transport.codec.Encoder; +import java.util.Map; + /** * Struct @@ -42,7 +41,7 @@ public abstract class Struct implements Encodable return StructFactory.create(type); } - boolean dirty = true; + private boolean dirty = true; public boolean isDirty() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index 6ff3b21400..27fce6e167 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -20,8 +20,14 @@ */ package org.apache.qpid.transport.codec; -import java.io.UnsupportedEncodingException; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -29,8 +35,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.*; - /** * AbstractDecoder @@ -357,13 +361,13 @@ abstract class AbstractDecoder implements Decoder private long readSize(Type t) { - if (t.fixed) + if (t.isFixed()) { - return t.width; + return t.getWidth(); } else { - return readSize(t.width); + return readSize(t.getWidth()); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 0ccfcfcb70..2b93697bfc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -20,24 +20,22 @@ */ package org.apache.qpid.transport.codec; -import java.io.UnsupportedEncodingException; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; -import java.nio.ByteBuffer; +import org.apache.qpid.transport.Xid; +import static org.apache.qpid.transport.util.Functions.lsb; -import java.util.Collections; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import static org.apache.qpid.transport.util.Functions.*; - /** * AbstractEncoder @@ -64,6 +62,7 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(Character.class, Type.CHAR); ENCODINGS.put(byte[].class, Type.VBIN32); ENCODINGS.put(UUID.class, Type.UUID); + ENCODINGS.put(Xid.class, Type.STRUCT32); } private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>() @@ -362,7 +361,7 @@ abstract class AbstractEncoder implements Encoder Object value = entry.getValue(); Type type = encoding(value); writeStr8(key); - put(type.code); + put(type.getCode()); write(type, value); } } @@ -383,7 +382,7 @@ abstract class AbstractEncoder implements Encoder for (Object value : list) { Type type = encoding(value); - put(type.code); + put(type.getCode()); write(type, value); } } @@ -411,7 +410,7 @@ abstract class AbstractEncoder implements Encoder type = encoding(array.get(0)); } - put(type.code); + put(type.getCode()); writeUint32(array.size()); @@ -423,18 +422,18 @@ abstract class AbstractEncoder implements Encoder private void writeSize(Type t, int size) { - if (t.fixed) + if (t.isFixed()) { - if (size != t.width) + if (size != t.getWidth()) { throw new IllegalArgumentException - ("size does not match fixed width " + t.width + ": " + + ("size does not match fixed width " + t.getWidth() + ": " + size); } } else { - writeSize(t.width, size); + writeSize(t.getWidth(), size); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java index 10f67e1cd6..a8ac44200f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.transport.codec; +import org.apache.qpid.transport.Binary; + import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.qpid.transport.Binary; - /** * Byte Buffer Decoder. * Decoder concrete implementor using a backing byte buffer for decoding data. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java index a4df5b5fcb..fb3f91a3ce 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.transport.codec; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; + import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; - /** * Decoder interface. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java index 7d4f02af31..5a3cec5616 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.transport.codec; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; + import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; - /** * Encoder interface. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 8cd5c29f6d..a80b988cea 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,15 +20,23 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.BBDecoder; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.codec.BBDecoder; - /** * Assembler * @@ -181,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command = Method.create(commandType); command.setSync((0x0001 & hdr) != 0); command.read(dec); - if (command.hasPayload()) + if (command.hasPayload() && !frame.isLastSegment()) { setIncompleteCommand(channel, command); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 1a8d277bba..5a5de597c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; @@ -31,6 +29,8 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; +import java.nio.ByteBuffer; + /** * ConnectionBinding * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 6ac9df9bc3..fe437ecf93 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -30,16 +30,18 @@ import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; + import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; import static org.apache.qpid.transport.network.Frame.FIRST_SEG; import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; import static org.apache.qpid.transport.network.Frame.LAST_FRAME; import static org.apache.qpid.transport.network.Frame.LAST_SEG; -import static java.lang.Math.min; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import static java.lang.Math.min; + /** * Disassembler */ diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java index 849355276e..9416c4c9fa 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java @@ -21,15 +21,10 @@ package org.apache.qpid.transport.network; import org.apache.qpid.transport.SegmentType; -import org.apache.qpid.transport.util.SliceIterator; -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; -import java.util.Iterator; +import static org.apache.qpid.transport.util.Functions.str; -import static org.apache.qpid.transport.util.Functions.*; +import java.nio.ByteBuffer; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index b371df639e..4d4274278f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.transport.network; -import javax.net.ssl.SSLContext; - import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; +import javax.net.ssl.SSLContext; + public interface IncomingNetworkTransport extends NetworkTransport { public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index a2885f97bc..86e05db818 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -20,17 +20,19 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SegmentType; -import static org.apache.qpid.transport.util.Functions.*; +import static org.apache.qpid.transport.network.InputHandler.State.ERROR; +import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY; +import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR; +import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR; +import static org.apache.qpid.transport.util.Functions.str; -import static org.apache.qpid.transport.network.InputHandler.State.*; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 7384702525..2cc7c14f00 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.transport.Sender; + import java.net.SocketAddress; import java.nio.ByteBuffer; -import org.apache.qpid.transport.Sender; - public interface NetworkConnection { Sender<ByteBuffer> getSender(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index c3c248761c..0ebde483cf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -20,16 +20,13 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - -import javax.net.ssl.SSLContext; - import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; +import javax.net.ssl.SSLContext; +import java.nio.ByteBuffer; + public interface OutgoingNetworkTransport extends NetworkTransport { - public NetworkConnection getConnection(); - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index da4349ba86..55ba95ad75 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.TransportException; + import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.TransportException; - public class Transport { public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport"; @@ -54,6 +54,10 @@ public class Transport OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map); } + private Transport() + { + } + public static IncomingNetworkTransport getIncomingTransportInstance() { return (IncomingNetworkTransport) loadTransportClass( diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index bfc77539ce..4046691779 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -20,15 +20,16 @@ */ package org.apache.qpid.transport.network.io; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; public class IoNetworkConnection implements NetworkConnection { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 838a662402..42c8334a5d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -21,7 +21,11 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; import java.nio.ByteBuffer; import javax.net.ssl.SSLContext; @@ -29,16 +33,18 @@ import javax.net.ssl.SSLServerSocketFactory; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.*; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.util.Logger; +import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { - - private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); private Socket _socket; private IoNetworkConnection _connection; @@ -58,10 +64,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet _socket.setSendBufferSize(sendBufferSize); _socket.setReceiveBufferSize(receiveBufferSize); - LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize()); - LOGGER.debug("TCP_NODELAY : %s", _socket.getTcpNoDelay()); - + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); + LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); + } + InetAddress address = InetAddress.getByName(settings.getHost()); _socket.connect(new InetSocketAddress(address, settings.getPort())); @@ -120,7 +129,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { _acceptor = new AcceptingThread(config, factory, sslContext); - + _acceptor.setDaemon(false); _acceptor.start(); } catch (IOException e) @@ -133,9 +142,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private class AcceptingThread extends Thread { + private volatile boolean _closed = false; private NetworkTransportConfiguration _config; private ProtocolEngineFactory _factory; - private SSLContext _sslContent; + private SSLContext _sslContext; private ServerSocket _serverSocket; private AcceptingThread(NetworkTransportConfiguration config, @@ -145,9 +155,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { _config = config; _factory = factory; - _sslContent = sslContext; + _sslContext = sslContext; - InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort()); + InetSocketAddress address = config.getAddress(); if(sslContext == null) { @@ -155,12 +165,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } else { - SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory(); + SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); _serverSocket = socketFactory.createServerSocket(); } - _serverSocket.bind(address); _serverSocket.setReuseAddress(true); + _serverSocket.bind(address); } @@ -171,6 +181,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet */ public void close() { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + if (!_serverSocket.isClosed()) { try @@ -189,11 +202,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { try { - while (true) + while (!_closed) { + Socket socket = null; try { - Socket socket = _serverSocket.accept(); + socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); final Integer sendBufferSize = _config.getSendBufferSize(); @@ -206,27 +220,58 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout); - engine.setNetworkConnection(connection, connection.getSender()); connection.start(); - - } catch(RuntimeException e) { - LOGGER.error(e, "Error in Acceptor thread " + _config.getPort()); + LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + closeSocketIfNecessary(socket); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + closeSocketIfNecessary(socket); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } } } } - catch (IOException e) + finally { - LOGGER.debug(e, "SocketException - no new connections will be accepted on port " - + _config.getPort()); + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort()); + } } } - + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 5b714434d9..7e63071c16 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -26,6 +26,7 @@ import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; +import javax.net.ssl.SSLSocket; import java.io.IOException; import java.io.InputStream; import java.net.Socket; @@ -33,8 +34,6 @@ import java.net.SocketException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLSocket; - /** * IoReceiver * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 427487c879..a58fea47d2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,6 +18,14 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.common.Closeable; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + import static org.apache.qpid.transport.util.Functions.mod; import java.io.IOException; @@ -28,14 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.qpid.common.Closeable; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.util.Logger; - public final class IoSender implements Runnable, Sender<ByteBuffer> { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 9fd65c6e51..51ef266ee9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -20,22 +20,10 @@ */ package org.apache.qpid.transport.network.security; -import java.nio.ByteBuffer; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.sasl.SASLReceiver; -import org.apache.qpid.transport.network.security.sasl.SASLSender; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLSender; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +import java.nio.ByteBuffer; public interface SecurityLayer { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 08934004a8..442800c529 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -21,7 +21,10 @@ package org.apache.qpid.transport.network.security; import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.*; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; import org.apache.qpid.transport.network.security.ssl.SSLReceiver; @@ -34,6 +37,10 @@ import java.nio.ByteBuffer; public class SecurityLayerFactory { + private SecurityLayerFactory() + { + } + public static SecurityLayer newInstance(ConnectionSettings settings) { @@ -71,10 +78,10 @@ public class SecurityLayerFactory sslCtx = SSLContextFactory .buildClientContext(settings.getTrustStorePath(), settings.getTrustStorePassword(), - settings.getTrustStoreCertType(), + settings.getTrustManagerFactoryAlgorithm(), settings.getKeyStorePath(), settings.getKeyStorePassword(), - settings.getKeyStoreCertType(), + settings.getKeyManagerFactoryAlgorithm(), settings.getCertAlias()); } catch (Exception e) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java index 7964239e31..625e1a77c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java @@ -21,21 +21,19 @@ package org.apache.qpid.transport.network.security.sasl; */ -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; - import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + public abstract class SASLEncryptor implements ConnectionListener { - protected SaslClient saslClient; - protected boolean securityLayerEstablished = false; - protected int sendBuffSize; - protected int recvBuffSize; + private SaslClient saslClient; + private boolean securityLayerEstablished = false; + private int sendBuffSize; + private int recvBuffSize; public boolean isSecurityLayerEstablished() { @@ -63,4 +61,19 @@ public abstract class SASLEncryptor implements ConnectionListener public void closed(Connection conn) {} public abstract void securityLayerEstablished(); + + public SaslClient getSaslClient() + { + return saslClient; + } + + public int getSendBuffSize() + { + return sendBuffSize; + } + + public int getRecvBuffSize() + { + return recvBuffSize; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index 86106318ef..a100b96412 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -21,18 +21,16 @@ package org.apache.qpid.transport.network.security.sasl; */ -import java.nio.ByteBuffer; - -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.util.Logger; +import javax.security.sasl.SaslException; +import java.nio.ByteBuffer; + public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { - Receiver<ByteBuffer> delegate; + private Receiver<ByteBuffer> delegate; private byte[] netData; private static final Logger log = Logger.get(SASLReceiver.class); @@ -58,11 +56,11 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { while (buf.hasRemaining()) { - int length = Math.min(buf.remaining(),recvBuffSize); + int length = Math.min(buf.remaining(), getRecvBuffSize()); buf.get(netData, 0, length); try { - byte[] out = saslClient.unwrap(netData, 0, length); + byte[] out = getSaslClient().unwrap(netData, 0, length); delegate.received(ByteBuffer.wrap(out)); } catch (SaslException e) @@ -79,7 +77,7 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> public void securityLayerEstablished() { - netData = new byte[recvBuffSize]; + netData = new byte[getRecvBuffSize()]; log.debug("SASL Security Layer Established"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 2d9e4e9a7e..61d54a8386 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -21,19 +21,17 @@ package org.apache.qpid.transport.network.security.sasl; */ -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.util.Logger; +import javax.security.sasl.SaslException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { - protected Sender<ByteBuffer> delegate; + private Sender<ByteBuffer> delegate; private byte[] appData; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = Logger.get(SASLSender.class); @@ -54,7 +52,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { { try { - saslClient.dispose(); + getSaslClient().dispose(); } catch (SaslException e) { @@ -80,14 +78,14 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { { while (buf.hasRemaining()) { - int length = Math.min(buf.remaining(),sendBuffSize); - log.debug("sendBuffSize %s", sendBuffSize); + int length = Math.min(buf.remaining(), getSendBuffSize()); + log.debug("sendBuffSize %s", getSendBuffSize()); log.debug("buf.remaining() %s", buf.remaining()); buf.get(appData, 0, length); try { - byte[] out = saslClient.wrap(appData, 0, length); + byte[] out = getSaslClient().wrap(appData, 0, length); log.debug("out.length %s", out.length); delegate.send(ByteBuffer.wrap(out)); @@ -112,7 +110,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { public void securityLayerEstablished() { - appData = new byte[sendBuffSize]; + appData = new byte[getSendBuffSize()]; log.debug("SASL Security Layer Established"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 4391e8adfc..3ab028c8a8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.transport.network.security.ssl; +import org.apache.qpid.transport.util.Logger; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.X509ExtendedKeyManager; import java.io.IOException; import java.net.Socket; import java.security.GeneralSecurityException; @@ -28,25 +33,19 @@ import java.security.Principal; import java.security.PrivateKey; import java.security.cert.X509Certificate; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.X509ExtendedKeyManager; - -import org.apache.qpid.transport.util.Logger; - public class QpidClientX509KeyManager extends X509ExtendedKeyManager { private static final Logger log = Logger.get(QpidClientX509KeyManager.class); - X509ExtendedKeyManager delegate; - String alias; + private X509ExtendedKeyManager delegate; + private String alias; public QpidClientX509KeyManager(String alias, String keyStorePath, - String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException + String keyStorePassword, String keyManagerFactoryAlgorithmName) throws GeneralSecurityException, IOException { this.alias = alias; KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyStoreCertType); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName); kmf.init(ks, keyStorePassword.toCharArray()); this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 8ad40bbfd3..13a16d07b5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -20,19 +20,17 @@ */ package org.apache.qpid.transport.network.security.ssl; -import java.nio.ByteBuffer; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; - -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; public class SSLReceiver implements Receiver<ByteBuffer> { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 6f5aa6d86e..88943695d4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -19,20 +19,18 @@ */ package org.apache.qpid.transport.network.security.ssl; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLException; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; - -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import javax.net.ssl.SSLException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; public class SSLSender implements Sender<ByteBuffer> { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java index 6bb038a581..71a73db71f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.transport.network.security.ssl; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -30,19 +35,14 @@ import java.security.Principal; import java.security.cert.Certificate; import java.security.cert.X509Certificate; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; - -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.util.Logger; - public class SSLUtil { private static final Logger log = Logger.get(SSLUtil.class); - + + private SSLUtil() + { + } + public static void verifyHostname(SSLEngine engine,String hostnameExpected) { try diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index 5761228642..bd3e9bbcbc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -22,7 +22,7 @@ package org.apache.qpid.transport.util; import java.nio.ByteBuffer; -import static java.lang.Math.*; +import static java.lang.Math.min; /** @@ -31,8 +31,11 @@ import static java.lang.Math.*; * @author Rafael H. Schloming */ -public class Functions +public final class Functions { + private Functions() + { + } public static final int mod(int n, int m) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java deleted file mode 100644 index 3db29847b2..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.util; - -import java.nio.ByteBuffer; - -import java.util.Iterator; - - -/** - * SliceIterator - * - * @author Rafael H. Schloming - */ - -public class SliceIterator implements Iterator<ByteBuffer> -{ - - final private Iterator<ByteBuffer> iterator; - - public SliceIterator(Iterator<ByteBuffer> iterator) - { - this.iterator = iterator; - } - - public boolean hasNext() - { - return iterator.hasNext(); - } - - public ByteBuffer next() - { - return iterator.next().slice(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 26cb56ea97..3b9a0baab2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,26 +20,25 @@ */ package org.apache.qpid.url; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.HashMap; public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); - String _url; - AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; - AMQShortString _exchangeName = new AMQShortString(""); - AMQShortString _destinationName = new AMQShortString("");; - AMQShortString _queueName = new AMQShortString(""); - AMQShortString[] _bindingKeys = new AMQShortString[0]; + private String _url; + private AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + private AMQShortString _exchangeName = new AMQShortString(""); + private AMQShortString _destinationName = new AMQShortString("");; + private AMQShortString _queueName = new AMQShortString(""); + private AMQShortString[] _bindingKeys = new AMQShortString[0]; private HashMap<String, String> _options; public AMQBindingURL(String url) throws URISyntaxException diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java index 0ebfe0e869..fe7b01761b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -21,6 +21,12 @@ package org.apache.qpid.url; */ +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; + import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; @@ -28,11 +34,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BindingURLParser { private static final char PROPERTY_EQUALS_CHAR = '='; @@ -183,7 +184,7 @@ public class BindingURLParser char nextChar = _url[_index]; // check for the following special cases. - // "myQueue?durable='true'" or just "myQueue"; + // "myQueue?durable='true'" or just "myQueue" StringBuilder builder = new StringBuilder(); while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java index e261860bf3..8516e7fa0e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.url; -import java.util.HashMap; import java.util.Map; public class URLHelper @@ -29,6 +28,10 @@ public class URLHelper public static final char ALTERNATIVE_OPTION_SEPARATOR = ','; public static final char BROKER_SEPARATOR = ';'; + private URLHelper() + { + } + public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException { if ((options == null) || (options.indexOf('=') == -1)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java b/qpid/java/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java new file mode 100644 index 0000000000..7fca54a9ee --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.DataOutput; + +public class BytesDataOutput implements DataOutput +{ + private int _pos = 0; + private byte[] _buf; + + public BytesDataOutput(byte[] buf) + { + _buf = buf; + } + + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } + + public void reset() + { + _pos = 0; + } + + public int length() + { + return _pos; + } + + public void write(int b) + { + _buf[_pos++] = (byte) b; + } + + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } + + + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; + + } + + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } + + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } + + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + _buf[_pos++] = ((byte)s.charAt(i)); + } + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _pos; + _pos+=2; + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf[_pos++] = (byte) c; + + } + else if (c > 0x07FF) + { + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + else + { + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + } + + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java index 09478d4157..3d17bbf6ea 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -26,7 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.regex.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * CommandLineParser provides a utility for specifying the format of a command line and parsing command lines to ensure @@ -328,9 +329,6 @@ public class CommandLineParser expectingArgs = true; optionExpectingArgs = matchedOption; - // In the mean time set this options argument to the empty string in case no argument is ever - // supplied. - // options.put(matchedOption, ""); } // Check if the option was matched on its own and is a flag in which case set that flag. @@ -654,22 +652,22 @@ public class CommandLineParser protected static class CommandLineOption { /** Holds the text for the flag to match this argument with. */ - public String option = null; + private String option = null; /** Holds a string describing how to use this command line argument. */ - public String argument = null; + private String argument = null; /** Flag that determines whether or not this command line argument can take arguments. */ - public boolean expectsArgs = false; + private boolean expectsArgs = false; /** Holds a short comment describing what this command line argument is for. */ - public String comment = null; + private String comment = null; /** Flag that determines whether or not this is an mandatory command line argument. */ - public boolean mandatory = false; + private boolean mandatory = false; /** A regular expression describing what format the argument to this option muist have. */ - public String argumentFormatRegexp = null; + private String argumentFormatRegexp = null; /** * Create a command line option object that holds specific information about a command line option. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java deleted file mode 100644 index 633cf4fe3a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; - -public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E> -{ - private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class); - - protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>(); - - protected AtomicInteger _messageHeadSize = new AtomicInteger(0); - - @Override - public int size() - { - return super.size() + _messageHeadSize.get(); - } - - public int headSize() - { - return _messageHeadSize.get(); - } - - @Override - public E poll() - { - if (_messageHead.isEmpty()) - { - return super.poll(); - } - else - { - E e = _messageHead.poll(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Providing item(" + e + ")from message head"); - } - - if (e != null) - { - _messageHeadSize.decrementAndGet(); - } - - return e; - } - } - - @Override - public boolean remove(Object o) - { - - if (_messageHead.isEmpty()) - { - return super.remove(o); - } - else - { - if (_messageHead.remove(o)) - { - _messageHeadSize.decrementAndGet(); - - return true; - } - - return super.remove(o); - } - } - - @Override - public boolean removeAll(Collection<?> c) - { - if (_messageHead.isEmpty()) - { - return super.removeAll(c); - } - else - { - // fixme this is super.removeAll but iterator here doesn't work - // we need to be able to correctly decrement _messageHeadSize - // boolean modified = false; - // Iterator<?> e = iterator(); - // while (e.hasNext()) - // { - // if (c.contains(e.next())) - // { - // e.remove(); - // modified = true; - // _size.decrementAndGet(); - // } - // } - // return modified; - - throw new RuntimeException("Not implemented"); - } - } - - @Override - public boolean isEmpty() - { - return (_messageHead.isEmpty() && super.isEmpty()); - } - - @Override - public void clear() - { - super.clear(); - _messageHead.clear(); - } - - @Override - public boolean contains(Object o) - { - return _messageHead.contains(o) || super.contains(o); - } - - @Override - public boolean containsAll(Collection<?> o) - { - return _messageHead.containsAll(o) || super.containsAll(o); - } - - @Override - public E element() - { - if (_messageHead.isEmpty()) - { - return super.element(); - } - else - { - return _messageHead.element(); - } - } - - @Override - public E peek() - { - if (_messageHead.isEmpty()) - { - return super.peek(); - } - else - { - E o = _messageHead.peek(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Peeking item (" + o + ") from message head"); - } - - return o; - } - - } - - @Override - public Iterator<E> iterator() - { - final Iterator<E> mainMessageIterator = super.iterator(); - - return new Iterator<E>() - { - final Iterator<E> _headIterator = _messageHead.iterator(); - final Iterator<E> _mainIterator = mainMessageIterator; - - Iterator<E> last; - - public boolean hasNext() - { - return _headIterator.hasNext() || _mainIterator.hasNext(); - } - - public E next() - { - if (_headIterator.hasNext()) - { - last = _headIterator; - - return _headIterator.next(); - } - else - { - last = _mainIterator; - - return _mainIterator.next(); - } - } - - public void remove() - { - last.remove(); - if(last == _mainIterator) - { - _size.decrementAndGet(); - } - else - { - _messageHeadSize.decrementAndGet(); - } - } - }; - } - - @Override - public boolean retainAll(Collection<?> c) - { - throw new RuntimeException("Not Implemented"); - } - - @Override - public Object[] toArray() - { - throw new RuntimeException("Not Implemented"); - } - - public boolean pushHead(E o) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Adding item(" + o + ") to head of queue"); - } - - if (_messageHead.offer(o)) - { - _messageHeadSize.incrementAndGet(); - - return true; - } - - return false; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java deleted file mode 100644 index c4d7683a02..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E> -{ - AtomicInteger _size = new AtomicInteger(0); - - public int size() - { - return _size.get(); - } - - public boolean offer(E o) - { - - if (super.offer(o)) - { - _size.incrementAndGet(); - return true; - } - - return false; - } - - public E poll() - { - E e = super.poll(); - - if (e != null) - { - _size.decrementAndGet(); - } - - return e; - } - - @Override - public boolean remove(Object o) - { - if (super.remove(o)) - { - _size.decrementAndGet(); - return true; - } - - return false; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java deleted file mode 100644 index 1f168345a1..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.util.concurrent.ConcurrentLinkedQueue; - -public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E> -{ - public int size() - { - if (isEmpty()) - { - return 0; - } - else - { - return 1; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index ac8e3da3c2..2d3e321812 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -46,6 +46,10 @@ import java.util.List; */ public class FileUtils { + private FileUtils() + { + } + /** * Reads a text file as a string. * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java deleted file mode 100644 index b5efaa61b6..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.util; - -import java.util.Queue; - -/** - * Defines a queue that has a push operation to add an element to the head of the queue. - * - * @todo Seems like this may be pointless, the implementation uses this method to increment the message count - * then calls offer. Why not simply override offer and drop this interface? - */ -public interface MessageQueue<E> extends Queue<E> -{ - /** - * Inserts the specified element into this queue, if possible. When using queues that may impose insertion - * restrictions (for example capacity bounds), method offer is generally preferable to method Collection.add(E), - * which can fail to insert an element only by throwing an exception. - * - * @param o The element to insert. - * - * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt> - */ - boolean pushHead(E o); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java index 4c653e6ca0..971dd3fe2a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java @@ -29,14 +29,20 @@ public class NetMatcher public void initInetNetworks(final Collection nets) { networks = new ArrayList(); - for (Iterator iter = nets.iterator(); iter.hasNext(); ) try + for (Iterator iter = nets.iterator(); iter.hasNext(); ) { - InetNetwork net = InetNetwork.getFromString((String) iter.next()); - if (!networks.contains(net)) networks.add(net); - } - catch (java.net.UnknownHostException uhe) - { - log("Cannot resolve address: " + uhe.getMessage()); + try + { + InetNetwork net = InetNetwork.getFromString((String) iter.next()); + if (!networks.contains(net)) + { + networks.add(net); + } + } + catch (java.net.UnknownHostException uhe) + { + log("Cannot resolve address: " + uhe.getMessage()); + } } networks.trimToSize(); } @@ -44,14 +50,20 @@ public class NetMatcher public void initInetNetworks(final String[] nets) { networks = new ArrayList(); - for (int i = 0; i < nets.length; i++) try - { - InetNetwork net = InetNetwork.getFromString(nets[i]); - if (!networks.contains(net)) networks.add(net); - } - catch (java.net.UnknownHostException uhe) + for (int i = 0; i < nets.length; i++) { - log("Cannot resolve address: " + uhe.getMessage()); + try + { + InetNetwork net = InetNetwork.getFromString(nets[i]); + if (!networks.contains(net)) + { + networks.add(net); + } + } + catch (java.net.UnknownHostException uhe) + { + log("Cannot resolve address: " + uhe.getMessage()); + } } networks.trimToSize(); } @@ -71,10 +83,13 @@ public class NetMatcher boolean sameNet = false; - if (ip != null) for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) + if (ip != null) { - InetNetwork network = (InetNetwork) iter.next(); - sameNet = network.contains(ip); + for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) + { + InetNetwork network = (InetNetwork) iter.next(); + sameNet = network.contains(ip); + } } return sameNet; } @@ -156,12 +171,21 @@ class InetNetwork public static InetNetwork getFromString(String netspec) throws java.net.UnknownHostException { - if (netspec.endsWith("*")) netspec = normalizeFromAsterisk(netspec); + if (netspec.endsWith("*")) + { + netspec = normalizeFromAsterisk(netspec); + } else { int iSlash = netspec.indexOf('/'); - if (iSlash == -1) netspec += "/255.255.255.255"; - else if (netspec.indexOf('.', iSlash) == -1) netspec = normalizeFromCIDR(netspec); + if (iSlash == -1) + { + netspec += "/255.255.255.255"; + } + else if (netspec.indexOf('.', iSlash) == -1) + { + netspec = normalizeFromCIDR(netspec); + } } return new InetNetwork(InetAddress.getByName(netspec.substring(0, netspec.indexOf('/'))), @@ -205,8 +229,12 @@ class InetNetwork String[] masks = { "0.0.0.0/0.0.0.0", "0.0.0/255.0.0.0", "0.0/255.255.0.0", "0/255.255.255.0" }; char[] srcb = netspec.toCharArray(); int octets = 0; - for (int i = 1; i < netspec.length(); i++) { - if (srcb[i] == '.') octets++; + for (int i = 1; i < netspec.length(); i++) + { + if (srcb[i] == '.') + { + octets++; + } } return (octets == 0) ? masks[0] : netspec.substring(0, netspec.length() -1 ).concat(masks[octets]); } @@ -244,10 +272,18 @@ class InetNetwork private static InetAddress getByAddress(byte[] ip) throws java.net.UnknownHostException { InetAddress addr = null; - if (getByAddress != null) try { - addr = (InetAddress) getByAddress.invoke(null, new Object[] { ip }); - } catch (IllegalAccessException e) { - } catch (java.lang.reflect.InvocationTargetException e) { + if (getByAddress != null) + { + try + { + addr = (InetAddress) getByAddress.invoke(null, new Object[] { ip }); + } + catch (IllegalAccessException e) + { + } + catch (java.lang.reflect.InvocationTargetException e) + { + } } if (addr == null) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java deleted file mode 100644 index 93266f2486..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -/** - * Contains pretty printing convenienve methods for producing formatted logging output, mostly for debugging purposes. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - * - * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays. - */ -public class PrettyPrintingUtils -{ - /** - * Pretty prints an array of ints as a string. - * - * @param array The array to pretty print. - * - * @return The pretty printed string. - */ - public static String printArray(int[] array) - { - StringBuilder result = new StringBuilder("["); - for (int i = 0; i < array.length; i++) - { - result.append(array[i]) - .append((i < (array.length - 1)) ? ", " : ""); - } - - result.append(']'); - - return result.toString(); - } - - /** - * Pretty prints an array of strings as a string. - * - * @param array The array to pretty print. - * - * @return The pretty printed string. - */ - public static String printArray(String[] array) - { - String result = "["; - for (int i = 0; i < array.length; i++) - { - result += array[i]; - result += (i < (array.length - 1)) ? ", " : ""; - } - - result += "]"; - - return result; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java index 8ad9d00f54..451d5d60eb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java @@ -23,8 +23,6 @@ package org.apache.qpid.util; import java.util.Comparator; -import org.apache.qpid.SerialException; - /** * This class provides basic serial number comparisons as defined in * RFC 1982. @@ -32,6 +30,9 @@ import org.apache.qpid.SerialException; public class Serial { + private Serial() + { + } public static final Comparator<Integer> COMPARATOR = new Comparator<Integer>() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java index fe1a300479..f2d51ccfde 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -21,7 +21,6 @@ package org.apache.qpid.util; import java.io.UnsupportedEncodingException; - import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -37,6 +36,9 @@ import java.util.regex.Pattern; public final class Strings { + private Strings() + { + } private static final byte[] EMPTY = new byte[0]; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java index 4bf6b7f0a2..d9b2dd8413 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java @@ -28,6 +28,9 @@ package org.apache.qpid.util; public final class UUIDs { + private UUIDs() + { + } public static final UUIDGen newGenerator() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java deleted file mode 100644 index e0c0337898..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a - * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Signal that an unblocking take has already occurred. - * </table> - */ -public class AlreadyUnblockedException extends RuntimeException -{ } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java deleted file mode 100644 index 63d8f77edb..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.Collection; -import java.util.concurrent.BlockingQueue; - -/** - * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this - * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull - * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is - * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue. - * - * <p>There are a number of possible advantages to using this technique when compared with having the producers - * processing their own data: - * - * <ul> - * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li> - * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed - * before being allowed to continue.</li> - * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to - * hold data between production and consumption.</li> - * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over - * individual data item consumption where latency associated with the consume operation can be ammortized. - * For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li> - * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work - * amongst many workers and gathering the results together again.</li> - * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example, - * lifo, fifo, priority heap, etc.</li> - * </ul> - * - * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package - * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the - * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional - * take methods that can be used to take data from a queue without releasing producers, so that consumers have an - * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with - * exceptions so that consumers can signal exception cases back to producers where there are errors in the data. - * - * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data - * from many threads but where synchronous handling of that data is neccessary because producers need to know that - * their data has been processed before they continue. For example, sending a bundle of messages together, or writing - * many records to disk at once, may result in improved performance but the originators of the messages or disk records - * need confirmation that their data has really been sent or saved to disk. - * - * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the - * {@link SynchRecord} interface. - * - * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)} and - * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they - * have been called with unblock set to false. That is they take elements from the queue but leave the producers - * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through - * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process - * all the records it takes. - * - * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller. - * In order to handle exceptions the {@link #tryPut} method must be used. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Handle synchronous puts, with possible exceptions. - * <tr><td> Allow consumers to take many records from a queue in a batch. - * <tr><td> Allow consumers to decide when to unblock synchronous producers. - * </table> - */ -public interface BatchSynchQueue<E> extends BlockingQueue<E> -{ - /** - * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the - * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. - * - * @param e The data element to put into the queue. - * - * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting - * on its entry in the queue being consumed. - * @throws SynchException If a consumer encounters an error whilst processing the data element. - */ - public void tryPut(E e) throws InterruptedException, SynchException; - - /** - * Takes all available data items from the queue or blocks until some become available. The returned items - * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock); - - /** - * Takes up to maxElements available data items from the queue or blocks until some become available. The returned - * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param maxElements The maximum number of elements to drain. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java deleted file mode 100644 index 4564b1d686..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java +++ /dev/null @@ -1,834 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. - * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being - * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and - * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is - * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. - * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls. - * - * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete - * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract}, - * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement - * the buffer other than by a queue, for example, by using an array. - * - * <p/>Normal queue methods to work asynchronously. - * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately - * when their data is taken. - * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the - * option to keep producers blocked until the consumer decides to release them. - * - * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to - * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency - * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io) - * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the - * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an - * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to - * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - */ -public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E> -{ - /** Used for logging. */ - private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class); - - /** Holds a reference to the queue implementation that holds the buffer. */ - Queue<SynchRecordImpl<E>> buffer; - - /** Holds the number of items in the queue */ - private int count; - - /** Main lock guarding all access */ - private ReentrantLock lock; - - /** Condition for waiting takes */ - private Condition notEmpty; - - /** Condition for waiting puts */ - private Condition notFull; - - /** - * Creates a batch synch queue without fair thread scheduling. - */ - public BatchSynchQueueBase() - { - this(false); - } - - /** - * Ensures that the underlying buffer implementation is created. - * - * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer. - */ - public BatchSynchQueueBase(boolean fair) - { - buffer = this.createQueue(); - - // Create the buffer lock with the fairness flag set accordingly. - lock = new ReentrantLock(fair); - - // Create the non-empty and non-full condition monitors on the buffer lock. - notEmpty = lock.newCondition(); - notFull = lock.newCondition(); - } - - /** - * Returns an iterator over the elements contained in this collection. - * - * @return An iterator over the elements contained in this collection. - */ - public Iterator<E> iterator() - { - throw new RuntimeException("Not implemented."); - } - - /** - * Returns the number of elements in this collection. If the collection contains more than - * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>. - * - * @return The number of elements in this collection. - */ - public int size() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return count; - } - finally - { - lock.unlock(); - } - } - - /** - * Inserts the specified element into this queue, if possible. When using queues that may impose insertion - * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method - * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception. - * - * @param e The element to insert. - * - * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt> - */ - public boolean offer(E e) - { - if (e == null) - { - throw new NullPointerException(); - } - - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return insert(e, false); - } - finally - { - lock.unlock(); - } - } - - /** - * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to - * become available. - * - * @param e The element to add. - * @param timeout How long to wait before giving up, in units of <tt>unit</tt> - * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. - * - * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is - * available. - * - * @throws InterruptedException If interrupted while waiting. - * @throws NullPointerException If the specified element is <tt>null</tt>. - */ - public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException - { - if (e == null) - { - throw new NullPointerException(); - } - - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - long nanos = unit.toNanos(timeout); - - try - { - do - { - if (insert(e, false)) - { - return true; - } - - try - { - nanos = notFull.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - notFull.signal(); // propagate to non-interrupted thread - throw ie; - } - } - while (nanos > 0); - - return false; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty. - * - * @return The head of this queue, or <tt>null</tt> if this queue is empty. - */ - public E poll() - { - final ReentrantLock lock = this.lock; - - lock.lock(); - try - { - if (count == 0) - { - return null; - } - - E x = extract(true, true).getElement(); - - return x; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements - * are present on this queue. - * - * @param timeout How long to wait before giving up, in units of <tt>unit</tt>. - * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. - * - * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present. - * - * @throws InterruptedException If interrupted while waiting. - */ - public E poll(long timeout, TimeUnit unit) throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try - { - long nanos = unit.toNanos(timeout); - - do - { - if (count != 0) - { - E x = extract(true, true).getElement(); - - return x; - } - - try - { - nanos = notEmpty.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - notEmpty.signal(); // propagate to non-interrupted thread - throw ie; - } - } - while (nanos > 0); - - return null; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty. - * - * @return The head of this queue, or <tt>null</tt> if this queue is empty. - */ - public E peek() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return peekAtBufferHead(); - } - finally - { - lock.unlock(); - } - } - - /** - * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) - * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit. - * - * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by - * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt> - * or <tt>take</tt> an element. - * - * @return The remaining capacity. - */ - public int remainingCapacity() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return getBufferCapacity() - count; - } - finally - { - lock.unlock(); - } - } - - /** - * Adds the specified element to this queue, waiting if necessary for space to become available. - * - * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised - * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these - * exceptions. - * - * @param e The element to add. - * - * @throws InterruptedException If interrupted while waiting. - */ - public void put(E e) throws InterruptedException - { - try - { - tryPut(e); - } - catch (SynchException ex) - { - // This exception is deliberately ignored. See the method comment for information about this. - } - } - - /** - * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the - * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. - * - * @param e The data element to put into the queue. Cannot be null. - * - * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting - * on its entry in the queue being consumed. - * @throws SynchException If a consumer encounters an error whilst processing the data element. - */ - public void tryPut(E e) throws InterruptedException, SynchException - { - if (e == null) - { - throw new NullPointerException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - try - { - while (count == getBufferCapacity()) - { - // Release the lock and wait until the queue is not full. - notFull.await(); - } - } - catch (InterruptedException ie) - { - notFull.signal(); // propagate to non-interrupted thread - throw ie; - } - - // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block - // the producer until its data is taken. - insert(e, true); - } - - /** - * Retrieves and removes the head of this queue, waiting if no elements are present on this queue. - * Any producer that has its data element taken by this call will be immediately unblocked. To keep the - * producer blocked whilst taking just a single item, use the - * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)} - * method. There is no take method to do that because there is not usually any advantage in a synchronous hand - * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption - * latencies accross many producers where possible. - * - * @return The head of this queue. - * - * @throws InterruptedException if interrupted while waiting. - */ - public E take() throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - try - { - try - { - while (count == 0) - { - // Release the lock and wait until the queue becomes non-empty. - notEmpty.await(); - } - } - catch (InterruptedException ie) - { - notEmpty.signal(); // propagate to non-interrupted thread - throw ie; - } - - // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is - // not full, and unblock the producer that owns the data item that is taken. - E x = extract(true, true).getElement(); - - return x; - } - finally - { - lock.unlock(); - } - } - - /** - * Removes all available elements from this queue and adds them into the given collection. This operation may be - * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements - * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated - * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further, - * the behavior of this operation is undefined if the specified collection is modified while the operation is in - * progress. - * - * @param objects The collection to transfer elements into. - * - * @return The number of elements transferred. - * - * @throws NullPointerException If objects is null. - * @throws IllegalArgumentException If objects is this queue. - */ - public int drainTo(Collection<? super E> objects) - { - return drainTo(objects, -1); - } - - /** - * Removes at most the given number of available elements from this queue and adds them into the given collection. - * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements - * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue - * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if - * the specified collection is modified while the operation is in progress. - * - * @param objects The collection to transfer elements into. - * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning - * all elements. - * - * @return The number of elements transferred. - * - * @throws NullPointerException If c is null. - * @throws IllegalArgumentException If c is this queue. - */ - public int drainTo(Collection<? super E> objects, int maxElements) - { - if (objects == null) - { - throw new NullPointerException(); - } - - if (objects == this) - { - throw new IllegalArgumentException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - int n = 0; - - for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) - { - // Take items from the queue, do unblock the producers, but don't send not full signals yet. - objects.add(extract(true, false).getElement()); - } - - if (n > 0) - { - // count -= n; - notFull.signalAll(); - } - - return n; - } - finally - { - lock.unlock(); - } - } - - /** - * Takes all available data items from the queue or blocks until some become available. The returned items - * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock) - { - return drainTo(c, -1, unblock); - } - - /** - * Takes up to maxElements available data items from the queue or blocks until some become available. The returned - * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param coll The collection to drain the data items into. - * @param maxElements The maximum number of elements to drain. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock) - { - if (coll == null) - { - throw new NullPointerException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - int n = 0; - - for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) - { - // Extract the next record from the queue, don't signall the not full condition yet and release - // producers depending on whether the caller wants to or not. - coll.add(extract(false, unblock)); - } - - if (n > 0) - { - // count -= n; - notFull.signalAll(); - } - - return new SynchRefImpl(n, coll); - } - finally - { - lock.unlock(); - } - } - - /** - * This abstract method should be overriden to return an empty queue. Different implementations of producer - * consumer buffers can control the order in which data is accessed using different queue implementations. - * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete - * implementations. - * - * @return An empty queue. - */ - protected abstract <T> Queue<T> createQueue(); - - /** - * Insert element into the queue, then possibly signal that the queue is not empty and block the producer - * on the element until permission to procede is given. - * - * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process - * will be able to get access to the queue. Hence, unlock and block are always set together. - * - * <p/>Call only when holding the global lock. - * - * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked. - * - * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this - * method may not return straight away, but only after the producer is unblocked by having its data - * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no - * matter what value the unlockAndBlock flag has, leaving the global lock on. - */ - protected boolean insert(E x, boolean unlockAndBlock) - { - // Create a new record for the data item. - SynchRecordImpl<E> record = new SynchRecordImpl<E>(x); - - boolean result = buffer.offer(record); - - if (result) - { - count++; - - // Tell any waiting consumers that the queue is not empty. - notEmpty.signal(); - - if (unlockAndBlock) - { - // Allow other threads to read/write the queue. - lock.unlock(); - - // Wait until a consumer takes this data item. - record.waitForConsumer(); - } - - return true; - } - else - { - return false; - } - } - - /** - * Extract element at current take position, advance, and signal. - * - * <p/>Call only when holding lock. - */ - protected SynchRecordImpl<E> extract(boolean unblock, boolean signal) - { - SynchRecordImpl<E> result = buffer.remove(); - count--; - - if (signal) - { - notFull.signal(); - } - - if (unblock) - { - result.releaseImmediately(); - } - - return result; - } - - /** - * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned. - * - * <p/>Call only when holding lock. - * - * @return The maximum capacity of the buffer. - */ - protected int getBufferCapacity() - { - if (buffer instanceof Capacity) - { - return ((Capacity) buffer).getCapacity(); - } - else - { - return Integer.MAX_VALUE; - } - } - - /** - * Return the head element from the buffer. - * - * <p/>Call only when holding lock. - * - * @return The head element from the buffer. - */ - protected E peekAtBufferHead() - { - return buffer.peek().getElement(); - } - - public class SynchRefImpl implements SynchRef - { - /** Holds the number of synch records associated with this reference. */ - int numRecords; - - /** Holds a reference to the collection of synch records managed by this. */ - Collection<SynchRecord<E>> records; - - public SynchRefImpl(int n, Collection<SynchRecord<E>> records) - { - this.numRecords = n; - this.records = records; - } - - public int getNumRecords() - { - return numRecords; - } - - /** - * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked - * when this method is called. The exception to this is producers that have had their data put back onto the queue - * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked - * but will not return from their put call normally, but with an exception instead. - */ - public void unblockProducers() - { - log.debug("public void unblockProducers(): called"); - - if (records != null) - { - for (SynchRecord<E> record : records) - { - // This call takes account of items that have already been released, are to be requeued or are in - // error. - record.releaseImmediately(); - } - } - - records = null; - } - } - - /** - * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows - * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when - * its data cannot be consumed. - */ - public class SynchRecordImpl<E> implements SynchRecord<E> - { - /** A boolean latch that determines when the producer for this data item will be allowed to continue. */ - BooleanLatch latch = new BooleanLatch(); - - /** The data element associated with this item. */ - E element; - - /** - * Create a new synch record. - * - * @param e The data element that the record encapsulates. - */ - public SynchRecordImpl(E e) - { - // Keep the data element. - element = e; - } - - /** - * Waits until the producer is given permission to proceded by a consumer. - */ - public void waitForConsumer() - { - latch.await(); - } - - /** - * Gets the data element contained by this record. - * - * @return The data element contained by this record. - */ - public E getElement() - { - return element; - } - - /** - * Immediately releases the producer of this data record. Consumers can bring the synchronization time of - * producers to a minimum by using this method to release them at the earliest possible moment when batch - * consuming records from sychronized producers. - */ - public void releaseImmediately() - { - // Check that the record has not already been released, is in error or is to be requeued. - latch.signal(); - - // Propagate errors to the producer. - - // Requeue items to be requeued. - } - - /** - * Tells the synch queue to put this element back onto the queue instead of releasing its producer. - * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or - * the {@link #releaseImmediately()} method. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this - * element has already been unblocked. - */ - public void reQueue() - { - throw new RuntimeException("Not implemented."); - } - - /** - * Tells the synch queue to raise an exception with this elements producer. The exception is not raised - * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the - * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is - * raised on the producer. - * - * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used - * because the exception is to be passed onto a different thread. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this - * element has already been unblocked. - * - * @param e The exception to raise on the producer. - */ - public void inError(Exception e) - { - throw new RuntimeException("Not implemented."); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java deleted file mode 100644 index 0e4a07594f..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java +++ /dev/null @@ -1,128 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.concurrent.locks.AbstractQueuedSynchronizer; - -/** - * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives - * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is - * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch - * is signalled it cannot be reset to red again. - * - * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization. - * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Block threads until a go signal is given. - * </table> - * - * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted - * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the - * latch condition before continuing. - */ -public class BooleanLatch -{ - /** Holds the synchronizer that provides the thread queueing synchronization. */ - private final Sync sync = new Sync(); - - /** - * Tests whether or not the latch has been signalled, that is to say that, the light is green. - * - * <p/>This method is non-blocking. - * - * @return <tt>true</tt> if the latch may be acquired; the light is green. - */ - public boolean isSignalled() - { - return sync.isSignalled(); - } - - /** - * Waits on the latch until the signal is given and the light is green. If the light is already green then the - * latch will be acquired and the thread will not have to wait. - * - * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying - * out any processing threads that return from this method should confirm that the go signal has really been given - * on this latch by calling the {@link #isSignalled()} method. - */ - public void await() - { - sync.acquireShared(1); - } - - /** - * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that - * were waiting for this condition to now run. - * - * <p/>This method is non-blocking. - */ - public void signal() - { - sync.releaseShared(1); - } - - /** - * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl - * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared - * mode. - */ - private static class Sync extends AbstractQueuedSynchronizer - { - /** - * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released. - * - * @param ignore This parameter is ignored. - * - * @return 1 if the shared acquisition succeeds and -1 if it fails. - */ - protected int tryAcquireShared(int ignore) - { - return isSignalled() ? 1 : -1; - } - - /** - * Releases the synchronizer, setting its internal state to 1. - * - * @param ignore This parameter is ignored. - * - * @return <tt>true</tt> always. - */ - protected boolean tryReleaseShared(int ignore) - { - setState(1); - - return true; - } - - /** - * Tests if the synchronizer is signalled. It is signalled when its internal state it 1. - * - * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise. - */ - boolean isSignalled() - { - return getState() != 0; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java deleted file mode 100644 index a97ce0e172..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * An interface exposed by data structures that have a maximum capacity. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Report the maximum capacity. - * </table> - */ -public interface Capacity -{ - public int getCapacity(); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java deleted file mode 100644 index bc63eb0353..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.Queue; - -/** - * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying - * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed - * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer - * is allocated up front and does not create garbage during the operation of the queue. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide array based FIFO queue to create a batch synched queue around. - * </table> - * - * @todo Write an array based buffer implementation that implements Queue. - */ -public class SynchBuffer<E> extends BatchSynchQueueBase<E> -{ - /** - * Returns an empty queue, implemented as an array. - * - * @return An empty queue, implemented as an array. - */ - protected <T> Queue<T> createQueue() - { - throw new RuntimeException("Not implemented."); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java deleted file mode 100644 index 99a83f96cd..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions - * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from - * the {@link #getCause} method. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Encapsulate a data element and exception. - * </table> - */ -public class SynchException extends Exception -{ - /** Holds the data element that is in error. */ - Object element; - - /** - * Creates a new BaseApplicationException object. - * - * @param message The exception message. - * @param cause The underlying throwable cause. This may be null. - */ - public SynchException(String message, Throwable cause, Object element) - { - super(message, cause); - - // Keep the data element that was in error. - this.element = element; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java deleted file mode 100644 index 95833f398a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.LinkedList; -import java.util.Queue; - -/** - * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying - * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more - * elements as needed. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide linked list FIFO queue to create a batch synched queue around. - * </table> - */ -public class SynchQueue<E> extends BatchSynchQueueBase<E> -{ - /** - * Returns an empty queue, implemented as a linked list. - * - * @return An empty queue, implemented as a linked list. - */ - protected <T> Queue<T> createQueue() - { - return new LinkedList<T>(); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java deleted file mode 100644 index fd740c20cd..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data - * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Get the underlying data element. - * <tr><td> Put the data element back on the queue without unblocking its producer. - * <tr><td> Send and exception to the data elements producer. - * </table> - */ -public interface SynchRecord<E> -{ - /** - * Gets the data element contained by this record. - * - * @return The data element contained by this record. - */ - public E getElement(); - - /** - * Tells the synch queue to put this element back onto the queue instead of releasing its producer. - * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element - * has already been unblocked. - */ - public void reQueue(); - - /** - * Immediately releases the producer of this data record. Consumers can bring the synchronization time of - * producers to a minimum by using this method to release them at the earliest possible moment when batch - * consuming records from sychronized producers. - */ - public void releaseImmediately(); - - /** - * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately - * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a - * {@link SynchException} before it is raised on the producer. - * - * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used - * because the exception is to be passed onto a different thread. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element - * has already been unblocked. - * - * @param e The exception to raise on the producer. - */ - public void inError(Exception e); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java deleted file mode 100644 index efe2344c06..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue}, - * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it - * wants producers that have their data taken to be unblocked. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities - * <tr><td> Report number of records returned by a taking operation. - * <tr><td> Provide call-back to release producers of taken records. - * </table> - */ -public interface SynchRef -{ - /** - * Reports the number of records taken by the take or drain operation. - * - * @return The number of records taken by the take or drain operation. - */ - public int getNumRecords(); - - /** - * Any producers that have had their data elements taken from the queue but have not been unblocked are - * unblocked when this method is called. The exception to this is producers that have had their data put back - * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers - * will be unblocked but will not return from their put call normally, but with an exception instead. - */ - public void unblockProducers(); -} |