diff options
author | Robert Greig <rgreig@apache.org> | 2006-11-23 23:12:28 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-11-23 23:12:28 +0000 |
commit | 4c8f9badcd32fd9a519184928f90e8a685412eaa (patch) | |
tree | b4485b039e2080520fa2767b054b0de2d9d3b096 | |
parent | 4e7b444e740f8f2bf55dd6a1684932388edca3e3 (diff) | |
download | qpid-python-4c8f9badcd32fd9a519184928f90e8a685412eaa.tar.gz |
Start of merge from trunk - some manual restructuring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@478706 13f79535-47bb-0310-9956-ffa450edef68
57 files changed, 5412 insertions, 0 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java new file mode 100644 index 0000000000..694fe75cda --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQChannelClosedException extends AMQException +{ + public AMQChannelClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java new file mode 100644 index 0000000000..677a4938a0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQFrame; + +public class AMQChannelException extends AMQException +{ + private final int _classId; + private final int _methodId; + + public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t) + { + super(errorCode, msg, t); + _classId = classId; + _methodId = methodId; + } + + public AMQChannelException(int errorCode, String msg, int classId, int methodId) + { + super(errorCode, msg); + _classId = classId; + _methodId = methodId; + } + + public AMQFrame getCloseFrame(int channel) + { + return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java new file mode 100644 index 0000000000..dcf393eb65 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQConnectionClosedException extends AMQException +{ + public AMQConnectionClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java new file mode 100644 index 0000000000..171af23500 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -0,0 +1,27 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid; + +public class AMQConnectionException extends AMQException +{ + public AMQConnectionException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java new file mode 100644 index 0000000000..616a95bd1b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ disconnected exception. + */ +public class AMQDisconnectedException extends AMQException +{ + public AMQDisconnectedException(String msg) + { + super(msg); + } +} + + diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java new file mode 100644 index 0000000000..423cbf8975 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -0,0 +1,74 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +import org.apache.log4j.Logger; + +/** + * Generic AMQ exception. + */ +public class AMQException extends Exception +{ + private int _errorCode; + + public AMQException(String message) + { + super(message); + } + + public AMQException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java new file mode 100644 index 0000000000..9424fe8eb1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * Generic AMQ exception. + */ +public class AMQUndeliveredException extends AMQException +{ + private Object _bounced; + + public AMQUndeliveredException(int errorCode, String msg, Object bounced) + { + super(errorCode, msg); + + _bounced = bounced; + } + + public Object getUndeliveredMessage() + { + return _bounced; + } + +} + + diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java new file mode 100644 index 0000000000..5565958d67 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +public class AMQUnresolvedAddressException extends AMQException +{ + String _broker; + + public AMQUnresolvedAddressException(String message, String broker) + { + super(message); + _broker = broker; + } + + public String toString() + { + return super.toString() + " Broker, \"" + _broker +"\""; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java new file mode 100644 index 0000000000..c62befce6b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolCodecFactory; +import org.apache.mina.filter.codec.ProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolEncoder; + +public class AMQCodecFactory implements ProtocolCodecFactory +{ + private AMQEncoder _encoder = new AMQEncoder(); + + private AMQDecoder _frameDecoder; + + /** + * @param expectProtocolInitiation true if the first frame received is going to be + * a protocol initiation frame, false if it is going to be a standard AMQ data block. + * The former case is used for the broker, which always expects to received the + * protocol initiation first from a newly connected client. + */ + public AMQCodecFactory(boolean expectProtocolInitiation) + { + _frameDecoder = new AMQDecoder(expectProtocolInitiation); + } + + public ProtocolEncoder getEncoder() + { + return _encoder; + } + + public ProtocolDecoder getDecoder() + { + return _frameDecoder; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java new file mode 100644 index 0000000000..594ae11233 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.ProtocolInitiation; + +/** + * There is one instance of this class per session. Any changes or configuration done + * at run time to the encoders or decoders only affects decoding/encoding of the + * protocol session data to which is it bound. + * + */ +public class AMQDecoder extends CumulativeProtocolDecoder +{ + private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + + private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); + + private boolean _expectProtocolInitiation; + + public AMQDecoder(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } + + protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + if (_expectProtocolInitiation) + { + return doDecodePI(session, in, out); + } + else + { + return doDecodeDataBlock(session, in, out); + } + } + + protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + int pos = in.position(); + boolean enoughData = _dataBlockDecoder.decodable(session, in); + in.position(pos); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _dataBlockDecoder.decode(session, in, out); + return true; + } + } + + private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + boolean enoughData = _piDecoder.decodable(session, in); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _piDecoder.decode(session, in, out); + return true; + } + } + + public void setExpectProtocolInitiation(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java new file mode 100644 index 0000000000..7d5e8182a6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.common.IoSession; +import org.apache.qpid.framing.AMQDataBlockEncoder; + +public class AMQEncoder implements ProtocolEncoder +{ + private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder(); + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + _dataBlockEncoder.encode(session, message, out); + } + + public void dispose(IoSession session) throws Exception + { + + } +} diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Configured.java b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java new file mode 100644 index 0000000000..cb5e8aff1d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * Marks a field as being "configured" externally. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface Configured +{ + /** + * The Commons Configuration path to the configuration element + */ + String path(); + + /** + * The default value to use should the path not be found in the configuration source + */ + String defaultValue(); +} diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java new file mode 100644 index 0000000000..f148ffc0b7 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +/** + * Indicates an error parsing a property expansion. + */ +public class PropertyException extends AMQException +{ + public PropertyException(String message) + { + super(message); + } + + public PropertyException(String msg, Throwable t) + { + super(msg, t); + } + + public PropertyException(int errorCode, String msg, Throwable t) + { + super(errorCode, msg, t); + } + + public PropertyException(int errorCode, String msg) + { + super(errorCode, msg); + } + + public PropertyException(Logger logger, String msg, Throwable t) + { + super(logger, msg, t); + } + + public PropertyException(Logger logger, String msg) + { + super(logger, msg); + } + + public PropertyException(Logger logger, int errorCode, String msg) + { + super(logger, errorCode, msg); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java new file mode 100644 index 0000000000..bd4000d3c4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -0,0 +1,153 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Based on code in Apache Ant, this utility class handles property expansion. This + * is most useful in config files and so on. + */ +public class PropertyUtils +{ + /** + * Replaces <code>${xxx}</code> style constructions in the given value + * with the string value of the corresponding data types. Replaces only system + * properties + * + * @param value The string to be scanned for property references. + * May be <code>null</code>, in which case this + * method returns immediately with no effect. + * @return the original string with the properties replaced, or + * <code>null</code> if the original string is <code>null</code>. + * @throws PropertyException if the string contains an opening + * <code>${</code> without a closing + * <code>}</code> + */ + public static String replaceProperties(String value) throws PropertyException + { + if (value == null) + { + return null; + } + + ArrayList<String> fragments = new ArrayList<String>(); + ArrayList<String> propertyRefs = new ArrayList<String>(); + parsePropertyString(value, fragments, propertyRefs); + + StringBuffer sb = new StringBuffer(); + Iterator j = propertyRefs.iterator(); + + for (String fragment : fragments) + { + if (fragment == null) + { + String propertyName = (String) j.next(); + + // try to get it from the project or keys + // Backward compatibility + String replacement = System.getProperty(propertyName); + + if (replacement == null) + { + throw new PropertyException("Property ${" + propertyName + + "} has not been set"); + } + fragment = replacement; + } + sb.append(fragment); + } + + return sb.toString(); + } + + /** + * Default parsing method. Parses the supplied value for properties which are specified + * using ${foo} syntax. $X is left as is, and $$ specifies a single $. + * @param value the property string to parse + * @param fragments is populated with the string fragments. A null means "insert a + * property value here. The number of nulls in the list when populated is equal to the + * size of the propertyRefs list + * @param propertyRefs populated with the property names to be added into the final + * String. + */ + private static void parsePropertyString(String value, ArrayList<String> fragments, + ArrayList<String> propertyRefs) + throws PropertyException + { + int prev = 0; + int pos; + //search for the next instance of $ from the 'prev' position + while ((pos = value.indexOf("$", prev)) >= 0) + { + + //if there was any text before this, add it as a fragment + if (pos > 0) + { + fragments.add(value.substring(prev, pos)); + } + //if we are at the end of the string, we tack on a $ + //then move past it + if (pos == (value.length() - 1)) + { + fragments.add("$"); + prev = pos + 1; + } + else if (value.charAt(pos + 1) != '{') + { + //peek ahead to see if the next char is a property or not + //not a property: insert the char as a literal + if (value.charAt(pos + 1) == '$') + { + // two $ map to one $ + fragments.add("$"); + prev = pos + 2; + } + else + { + // $X maps to $X for all values of X!='$' + fragments.add(value.substring(pos, pos + 2)); + prev = pos + 2; + } + } + else + { + // property found, extract its name or bail on a typo + int endName = value.indexOf('}', pos); + if (endName < 0) + { + throw new PropertyException("Syntax error in property: " + + value); + } + String propertyName = value.substring(pos + 2, endName); + fragments.add(null); + propertyRefs.add(propertyName); + prev = endName + 1; + } + } + //no more $ signs found + //if there is any tail to the file, append it + if (prev < value.length()) + { + fragments.add(value.substring(prev)); + } + } + + +} diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java new file mode 100644 index 0000000000..84a5836ff7 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -0,0 +1,33 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.exchange; + +public class ExchangeDefaults +{ + public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + + public final static String TOPIC_EXCHANGE_CLASS = "topic"; + + public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + + public final static String DIRECT_EXCHANGE_CLASS = "direct"; + + public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + + public final static String HEADERS_EXCHANGE_CLASS = "headers"; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java new file mode 100644 index 0000000000..fad0450960 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQBody +{ + protected abstract byte getType(); + + /** + * Get the size of the body + * @return unsigned short + */ + protected abstract int getSize(); + + protected abstract void writePayload(ByteBuffer buffer); + + protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java new file mode 100644 index 0000000000..797df391c3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * A data block represents something that has a size in bytes and the ability to write itself to a byte + * buffer (similar to a byte array). + */ +public abstract class AMQDataBlock implements EncodableAMQDataBlock +{ + /** + * Get the size of buffer needed to store the byte representation of this + * frame. + * @return unsigned integer + */ + public abstract long getSize(); + + /** + * Writes the datablock to the specified buffer. + * @param buffer + */ + public abstract void writePayload(ByteBuffer buffer); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java new file mode 100644 index 0000000000..3379cc18e9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -0,0 +1,113 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +import java.util.HashMap; +import java.util.Map; + +public class AMQDataBlockDecoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + + private final Map _supportedBodies = new HashMap(); + + public AMQDataBlockDecoder() + { + _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); + _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); + } + + public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + { + // type, channel, body size and end byte + if (in.remaining() < (1 + 2 + 4 + 1)) + { + return false; + } + + final byte type = in.get(); + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if (type <= 0 || channel < 0 || bodySize < 0) + { + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + if (in.remaining() < (bodySize + 1)) + { + return false; + } + return true; + } + + private boolean isSupportedFrameType(byte frameType) + { + final boolean result = _supportedBodies.containsKey(new Byte(frameType)); + + if (!result) + { + _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + } + + return result; + } + + protected Object createAndPopulateFrame(ByteBuffer in) + throws AMQFrameDecodingException + { + final byte type = in.get(); + if (!isSupportedFrameType(type)) + { + throw new AMQFrameDecodingException("Unsupported frame type: " + type); + } + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); + if (bodyFactory == null) + { + throw new AMQFrameDecodingException("Unsupported body type: " + type); + } + AMQFrame frame = new AMQFrame(); + + frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + } + return frame; + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + out.write(createAndPopulateFrame(in)); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java new file mode 100644 index 0000000000..9c0ca26dcf --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.demux.MessageEncoder; + +import java.util.HashSet; +import java.util.Set; + +public class AMQDataBlockEncoder implements MessageEncoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + + private Set _messageTypes; + + public AMQDataBlockEncoder() + { + _messageTypes = new HashSet(); + _messageTypes.add(EncodableAMQDataBlock.class); + } + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + final AMQDataBlock frame = (AMQDataBlock) message; + int frameSize = (int)frame.getSize(); + final ByteBuffer buffer = ByteBuffer.allocate(frameSize); + //buffer.setAutoExpand(true); + frame.writePayload(buffer); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); + } + + buffer.flip(); + out.write(buffer); + } + + public Set getMessageTypes() + { + return _messageTypes; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java new file mode 100644 index 0000000000..17f635c06a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -0,0 +1,73 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock +{ + public int channel; + + public AMQBody bodyFrame; + + public AMQFrame() + { + } + + public AMQFrame(int channel, AMQBody bodyFrame) + { + this.channel = channel; + this.bodyFrame = bodyFrame; + } + + public long getSize() + { + return 1 + 2 + 4 + bodyFrame.getSize() + 1; + } + + public void writePayload(ByteBuffer buffer) + { + buffer.put(bodyFrame.getType()); + // TODO: how does channel get populated + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); + bodyFrame.writePayload(buffer); + buffer.put((byte) 0xCE); + } + + /** + * + * @param buffer + * @param channel unsigned short + * @param bodySize unsigned integer + * @param bodyFactory + * @throws AMQFrameDecodingException + */ + public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) + throws AMQFrameDecodingException + { + this.channel = channel; + bodyFrame = bodyFactory.createBody(buffer); + bodyFrame.populateFromBuffer(buffer, bodySize); + } + + public String toString() + { + return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java new file mode 100644 index 0000000000..4e8a8c62b1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + +public class AMQFrameDecodingException extends AMQException +{ + public AMQFrameDecodingException(String message) + { + super(message); + } + + public AMQFrameDecodingException(String message, Throwable t) + { + super(message, t); + } + + public AMQFrameDecodingException(Logger log, String message) + { + super(log, message); + } + + public AMQFrameDecodingException(Logger log, String message, Throwable t) + { + super(log, message, t); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java new file mode 100644 index 0000000000..841295e538 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; + +public abstract class AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** unsigned short */ + protected abstract int getBodySize(); + + /** + * @return unsigned short + */ + protected abstract int getClazz(); + + /** + * @return unsigned short + */ + protected abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + protected byte getType() + { + return TYPE; + } + + protected int getSize() + { + return 2 + 2 + getBodySize(); + } + + protected void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + populateMethodBodyFromBuffer(buffer); + } + + public String toString() + { + StringBuffer buf = new StringBuffer(getClass().toString()); + buf.append(" Class: ").append(getClazz()); + buf.append(" Method: ").append(getMethod()); + return buf.toString(); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception + * should include the class and method ids of the body it resulted from). + */ + public AMQChannelException getChannelException(int code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod()); + } + + public AMQChannelException getChannelException(int code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), cause); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java new file mode 100644 index 0000000000..97f594061e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class AMQMethodBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); + + public static AMQMethodBodyFactory getInstance() + { + return _instance; + } + + private AMQMethodBodyFactory() + { + _log.debug("Creating method body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort()); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java new file mode 100644 index 0000000000..8a8175bbc8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolClassException extends AMQProtocolHeaderException +{ + public AMQProtocolClassException(String message) + { + super(message); + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java new file mode 100644 index 0000000000..6809c3d21e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; + +public class AMQProtocolHeaderException extends AMQException +{ + public AMQProtocolHeaderException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java new file mode 100644 index 0000000000..7f5b26010d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolInstanceException extends AMQProtocolHeaderException +{ + public AMQProtocolInstanceException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java new file mode 100644 index 0000000000..4f5677b41a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Exception that is thrown when the client and server differ on expected protocol version (header) information. + * + */ +public class AMQProtocolVersionException extends AMQProtocolHeaderException +{ + public AMQProtocolVersionException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java new file mode 100644 index 0000000000..859fdac489 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -0,0 +1,589 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class BasicContentHeaderProperties implements ContentHeaderProperties +{ + private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); + + /** + * We store the encoded form when we decode the content header so that if we need to + * write it out without modifying it we can do so without incurring the expense of + * reencoding it + */ + private byte[] _encodedForm; + + /** + * Flag indicating whether the entire content header has been decoded yet + */ + private boolean _decoded = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker + * for routing in some cases so we can decode that separately. + */ + private boolean _decodedHeaders = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The content type is used by all + * clients to determine the message type + */ + private boolean _decodedContentType = true; + + private String _contentType; + + private String _encoding; + + private FieldTable _headers; + + private byte _deliveryMode; + + private byte _priority; + + private String _correlationId; + + private String _replyTo; + + private long _expiration; + + private String _messageId; + + private long _timestamp; + + private String _type; + + private String _userId; + + private String _appId; + + private String _clusterId; + + private int _propertyFlags = 0; + + public BasicContentHeaderProperties() + { + } + + public int getPropertyListSize() + { + if (_encodedForm != null) + { + return _encodedForm.length; + } + else + { + int size = 0; + + if ((_propertyFlags & (1 << 15)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + size += EncodingUtils.encodedFieldTableLength(_headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 11)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 10)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + size += 8; + } + if ((_propertyFlags & (1 << 5)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_clusterId); + } + return size; + } + } + + private void clearEncodedForm() + { + if (!_decoded && _encodedForm != null) + { + //decode(); + } + _encodedForm = null; + } + + public void setPropertyFlags(int propertyFlags) + { + clearEncodedForm(); + _propertyFlags = propertyFlags; + } + + public int getPropertyFlags() + { + return _propertyFlags; + } + + public void writePropertyListPayload(ByteBuffer buffer) + { + if (_encodedForm != null) + { + buffer.put(_encodedForm); + } + else + { + if ((_propertyFlags & (1 << 15)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + buffer.put(_deliveryMode); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + buffer.put(_priority); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); + } + } + } + + public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException + { + _propertyFlags = propertyFlags; + + if (_logger.isDebugEnabled()) + { + _logger.debug("Property flags: " + _propertyFlags); + } + decode(buffer); + /*_encodedForm = new byte[size]; + buffer.get(_encodedForm, 0, size); + _decoded = false; + _decodedHeaders = false; + _decodedContentType = false;*/ + } + + private void decode(ByteBuffer buffer) + { + //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int pos = buffer.position(); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + _encoding = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + _deliveryMode = buffer.get(); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + _priority = buffer.get(); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + _correlationId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + _replyTo = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + _expiration = Long.parseLong(EncodingUtils.readShortString(buffer)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + _messageId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + _type = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + _userId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + _appId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + _clusterId = EncodingUtils.readShortString(buffer); + } + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + + final int endPos = buffer.position(); + buffer.position(pos); + final int len = endPos - pos; + _encodedForm = new byte[len]; + final int limit = buffer.limit(); + buffer.limit(endPos); + buffer.get(_encodedForm, 0, len); + buffer.limit(limit); + buffer.position(endPos); + _decoded = true; + } + + + private void decodeUpToHeaders() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + } + _decodedHeaders = true; + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + } + + private void decodeUpToContentType() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + + _decodedContentType = true; + } + + private void decodeIfNecessary() + { + if (!_decoded) + { + //decode(); + } + } + + private void decodeHeadersIfNecessary() + { + if (!_decoded && !_decodedHeaders) + { + decodeUpToHeaders(); + } + } + + private void decodeContentTypeIfNecessary() + { + if (!_decoded && !_decodedContentType) + { + decodeUpToContentType(); + } + } + public String getContentType() + { + decodeContentTypeIfNecessary(); + return _contentType; + } + + public void setContentType(String contentType) + { + clearEncodedForm(); + _propertyFlags |= (1 << 15); + _contentType = contentType; + } + + public String getEncoding() + { + decodeIfNecessary(); + return _encoding; + } + + public void setEncoding(String encoding) + { + clearEncodedForm(); + _propertyFlags |= (1 << 14); + _encoding = encoding; + } + + public FieldTable getHeaders() + { + decodeHeadersIfNecessary(); + return _headers; + } + + public void setHeaders(FieldTable headers) + { + clearEncodedForm(); + _propertyFlags |= (1 << 13); + _headers = headers; + } + + public byte getDeliveryMode() + { + decodeIfNecessary(); + return _deliveryMode; + } + + public void setDeliveryMode(byte deliveryMode) + { + clearEncodedForm(); + _propertyFlags |= (1 << 12); + _deliveryMode = deliveryMode; + } + + public byte getPriority() + { + decodeIfNecessary(); + return _priority; + } + + public void setPriority(byte priority) + { + clearEncodedForm(); + _propertyFlags |= (1 << 11); + _priority = priority; + } + + public String getCorrelationId() + { + decodeIfNecessary(); + return _correlationId; + } + + public void setCorrelationId(String correlationId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 10); + _correlationId = correlationId; + } + + public String getReplyTo() + { + decodeIfNecessary(); + return _replyTo; + } + + public void setReplyTo(String replyTo) + { + clearEncodedForm(); + _propertyFlags |= (1 << 9); + _replyTo = replyTo; + } + + public long getExpiration() + { + decodeIfNecessary(); + return _expiration; + } + + public void setExpiration(long expiration) + { + clearEncodedForm(); + _propertyFlags |= (1 << 8); + _expiration = expiration; + } + + + public String getMessageId() + { + decodeIfNecessary(); + return _messageId; + } + + public void setMessageId(String messageId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 7); + _messageId = messageId; + } + + public long getTimestamp() + { + decodeIfNecessary(); + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + clearEncodedForm(); + _propertyFlags |= (1 << 6); + _timestamp = timestamp; + } + + public String getType() + { + decodeIfNecessary(); + return _type; + } + + public void setType(String type) + { + clearEncodedForm(); + _propertyFlags |= (1 << 5); + _type = type; + } + + public String getUserId() + { + decodeIfNecessary(); + return _userId; + } + + public void setUserId(String userId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 4); + _userId = userId; + } + + public String getAppId() + { + decodeIfNecessary(); + return _appId; + } + + public void setAppId(String appId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 3); + _appId = appId; + } + + public String getClusterId() + { + decodeIfNecessary(); + return _clusterId; + } + + public void setClusterId(String clusterId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 2); + _clusterId = clusterId; + } + + public String toString() + { + return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java new file mode 100644 index 0000000000..cd40558de8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. + */ +public interface BodyFactory +{ + AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java new file mode 100644 index 0000000000..35e29aa064 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + private ByteBuffer _encodedBlock; + + private AMQDataBlock[] _blocks; + + public CompositeAMQDataBlock(AMQDataBlock[] blocks) + { + _blocks = blocks; + } + + /** + * The encoded block will be logically first before the AMQDataBlocks which are encoded + * into the buffer afterwards. + * @param encodedBlock already-encoded data + * @param blocks some blocks to be encoded. + */ + public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks) + { + this(blocks); + _encodedBlock = encodedBlock; + } + + public AMQDataBlock[] getBlocks() + { + return _blocks; + } + + public ByteBuffer getEncodedBlock() + { + return _encodedBlock; + } + + public long getSize() + { + long frameSize = 0; + for (int i = 0; i < _blocks.length; i++) + { + frameSize += _blocks[i].getSize(); + } + if (_encodedBlock != null) + { + _encodedBlock.rewind(); + frameSize += _encodedBlock.remaining(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + if (_encodedBlock != null) + { + buffer.put(_encodedBlock); + } + for (int i = 0; i < _blocks.length; i++) + { + _blocks[i].writePayload(buffer); + } + } + + public String toString() + { + if (_blocks == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{encodedBlock=").append(_encodedBlock); + for (int i = 0 ; i < _blocks.length; i++) + { + buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); + } + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java new file mode 100644 index 0000000000..d7b668534c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -0,0 +1,83 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentBody extends AMQBody +{ + public static final byte TYPE = 3; + + public ByteBuffer payload; + + protected byte getType() + { + return TYPE; + } + + public int getSize() + { + return (payload == null ? 0 : payload.limit()); + } + + public void writePayload(ByteBuffer buffer) + { + if (payload != null) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public void reduceBufferToFit() + { + if (payload != null && (payload.remaining() < payload.capacity() / 2)) + { + int size = payload.limit(); + ByteBuffer newPayload = ByteBuffer.allocate(size); + + newPayload.put(payload); + newPayload.position(0); + newPayload.limit(size); + + //reduce reference count on payload + payload.release(); + + payload = newPayload; + } + } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java new file mode 100644 index 0000000000..1d6b72ce76 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentBodyFactory _instance = new ContentBodyFactory(); + + public static ContentBodyFactory getInstance() + { + return _instance; + } + + private ContentBodyFactory() + { + _log.debug("Creating content body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new ContentBody(); + } +} + diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java new file mode 100644 index 0000000000..35ce107831 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -0,0 +1,112 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBody extends AMQBody +{ + public static final byte TYPE = 2; + + public int classId; + + public int weight; + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long bodySize; + + /** must never be null */ + public ContentHeaderProperties properties; + + public ContentHeaderBody() + { + } + + public ContentHeaderBody(ContentHeaderProperties props, int classId) + { + properties = props; + this.classId = classId; + } + + public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) + { + this(props, classId); + this.weight = weight; + this.bodySize = bodySize; + } + + protected byte getType() + { + return TYPE; + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + } + + /** + * Helper method that is used currently by the persistence layer (by BDB at the moment). + * @param buffer + * @param size + * @return + * @throws AMQFrameDecodingException + */ + public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + ContentHeaderBody body = new ContentHeaderBody(); + body.populateFromBuffer(buffer, size); + return body; + } + + public int getSize() + { + return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, classId); + EncodingUtils.writeUnsignedShort(buffer, weight); + buffer.putLong(bodySize); + EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); + properties.writePropertyListPayload(buffer); + } + + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + long bodySize) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); + return frame; + } + + public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java new file mode 100644 index 0000000000..236c5094fc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); + + public static ContentHeaderBodyFactory getInstance() + { + return _instance; + } + + private ContentHeaderBodyFactory() + { + _log.debug("Creating content header body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + // all content headers are the same - it is only the properties that differ. + // the content header body further delegates construction of properties + return new ContentHeaderBody(); + } + + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java new file mode 100644 index 0000000000..65b629bf17 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * There will be an implementation of this interface for each content type. All content types have associated + * header properties and this provides a way to encode and decode them. + */ +public interface ContentHeaderProperties +{ + /** + * Writes the property list to the buffer, in a suitably encoded form. + * @param buffer The buffer to write to + */ + void writePropertyListPayload(ByteBuffer buffer); + + /** + * Populates the properties from buffer. + * @param buffer The buffer to read from. + * @param propertyFlags he property flags. + * @throws AMQFrameDecodingException when the buffer does not contain valid data + */ + void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException; + + /** + * @return the size of the encoded property list in bytes. + */ + int getPropertyListSize(); + + /** + * Gets the property flags. Property flags indicate which properties are set in the list. The + * position and meaning of each flag is defined in the protocol specification for the particular + * content type with which these properties are associated. + * @return flags + */ + int getPropertyFlags(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java new file mode 100644 index 0000000000..ea1b0d6ef5 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderPropertiesFactory +{ + private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); + + public static ContentHeaderPropertiesFactory getInstance() + { + return _instance; + } + + private ContentHeaderPropertiesFactory() + { + } + + public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, + ByteBuffer buffer, int size) + throws AMQFrameDecodingException + { + ContentHeaderProperties properties; + switch (classId) + { + case BasicConsumeBody.CLASS_ID: + properties = new BasicContentHeaderProperties(); + break; + default: + throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + } + properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); + return properties; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java new file mode 100644 index 0000000000..3d493979eb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Marker interface to indicate to MINA that a data block should be encoded with the + * single encoder/decoder that we have defined. + * + * Note that due to a bug in MINA all classes must directly implement this interface, even if + * a superclass implements it. + * TODO: fix MINA so that this is not necessary + * + */ +public interface EncodableAMQDataBlock +{ + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java new file mode 100644 index 0000000000..65deb61dfb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -0,0 +1,546 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.nio.charset.Charset; + +public class EncodingUtils +{ + private static final Logger _logger = Logger.getLogger(EncodingUtils.class); + + private static final String STRING_ENCODING = "iso8859-15"; + + private static final Charset _charset = Charset.forName("iso8859-15"); + + public static final int SIZEOF_UNSIGNED_SHORT = 2; + public static final int SIZEOF_UNSIGNED_INT = 4; + + public static int encodedShortStringLength(String s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length(); + } + } + + public static int encodedLongStringLength(char[] s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length; + } + } + + public static int encodedLongstrLength(byte[] bytes) + { + if (bytes == null) + { + return 4; + } + else + { + return 4 + bytes.length; + } + } + + public static int encodedFieldTableLength(FieldTable table) + { + if (table == null) + { + // size is encoded as 4 octets + return 4; + } + else + { + // size of the table plus 4 octets for the size + return (int) table.getEncodedSize() + 4; + } + } + + public static void writeShortStringBytes(ByteBuffer buffer, String s) + { + if (s != null) + { + byte[] encodedString = new byte[s.length()]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short) encodedString.length); + buffer.put(encodedString); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, String s) + { + assert s == null || s.length() <= 0xFFFE; + if (s != null) + { + int len = s.length(); + writeUnsignedInteger(buffer, s.length()); + byte[] encodedString = new byte[len]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + { + assert s == null || s.length <= 0xFFFE; + if (s != null) + { + int len = s.length; + writeUnsignedInteger(buffer, s.length); + byte[] encodedString = new byte[len]; + for (int i = 0; i < s.length; i++) + { + encodedString[i] = (byte) s[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + { + assert bytes == null || bytes.length <= 0xFFFE; + if (bytes != null) + { + writeUnsignedInteger(buffer, bytes.length); + buffer.put(bytes); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeUnsignedByte(ByteBuffer buffer, short b) + { + byte bv = (byte) b; + buffer.put(bv); + } + + public static void writeUnsignedShort(ByteBuffer buffer, int s) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short) s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte) (0xFF & sv)); + } + } + + public static void writeUnsignedInteger(ByteBuffer buffer, long l) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int) l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8))); + buffer.put((byte) (0xFF & iv)); + } + } + + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + { + if (table != null) + { + table.writeToBuffer(buffer); + } + else + { + EncodingUtils.writeUnsignedInteger(buffer, 0); + } + } + + public static void writeBooleans(ByteBuffer buffer, boolean[] values) + { + byte packedValue = 0; + for (int i = 0; i < values.length; i++) + { + if (values[i]) + { + packedValue = (byte) (packedValue | (1 << i)); + } + } + + buffer.put(packedValue); + } + + /** + * This is used for writing longstrs. + * @param buffer + * @param data + */ + public static void writeLongstr(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + writeUnsignedInteger(buffer, data.length); + buffer.put(data); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeTimestamp(ByteBuffer buffer, long timestamp) + { + writeUnsignedInteger(buffer, 0/*timestamp msb*/); + writeUnsignedInteger(buffer, timestamp); + } + + public static boolean[] readBooleans(ByteBuffer buffer) + { + byte packedValue = buffer.get(); + boolean[] result = new boolean[8]; + + for (int i = 0; i < 8; i++) + { + result[i] = ((packedValue & (1 << i)) != 0); + } + return result; + } + + public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + return new FieldTable(buffer, length); + } + } + + public static String readShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[length]; + buffer.get(stringBytes, 0, length); + char[] stringChars = new char[length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + + return new String(stringChars); + } + } + + public static String readLongString(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[(int)length]; + buffer.get(stringBytes, 0, (int)length); + char[] stringChars = new char[(int)length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + return new String(stringChars); + } + } + + public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + byte[] result = new byte[(int)length]; + buffer.get(result); + return result; + } + } + + public static long readTimestamp(ByteBuffer buffer) + { + // Discard msb from AMQ timestamp + buffer.getUnsignedInt(); + return buffer.getUnsignedInt(); + } + + // Will barf with a NPE on a null input. Not sure whether it should return null or + // an empty field-table (which would be slower - perhaps unnecessarily). + // + // Some sample input and the result output: + // + // Input: "a=1" "a=2 c=3" "a=3 c=4 d" "a='four' b='five'" "a=bad" + // + // Parsing <a=1>... + // {a=1} + // Parsing <a=2 c=3>... + // {a=2, c=3} + // Parsing <a=3 c=4 d>... + // {a=3, c=4, d=null} + // Parsing <a='four' b='five'>... + // {a=four, b=five} + // Parsing <a=bad>... + // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>. + // + public static FieldTable createFieldTableFromMessageSelector(String selector) + { + boolean debug = _logger.isDebugEnabled(); + + // TODO: Doesn't support embedded quotes properly. + String[] expressions = selector.split(" +"); + + FieldTable result = new FieldTable(); + + for (int i = 0; i < expressions.length; i++) + { + String expr = expressions[i]; + + if (debug) + { + _logger.debug("Expression = <" + expr + ">"); + } + + int equals = expr.indexOf('='); + + if (equals < 0) + { + // Existence check + result.put("S" + expr.trim(), null); + } + else + { + String key = expr.substring(0, equals).trim(); + String value = expr.substring(equals + 1).trim(); + + if (debug) + { + _logger.debug("Key = <" + key + ">, Value = <" + value + ">"); + } + + if (value.charAt(0) == '\'') + { + if (value.charAt(value.length() - 1) != '\'') + { + throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">."); + } + else + { + value = value.substring(1, value.length() - 1); + + result.put("S" + key, value); + } + } + else + { + try + { + int intValue = Integer.parseInt(value); + + result.put("i" + key, value); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">."); + + } + } + } + } + + if (debug) + { + _logger.debug("Field-table created from <" + selector + "> is <" + result + ">"); + } + + return (result); + + } + + static byte[] hexToByteArray(String id) + { + // Should check param for null, long enough for this check, upper-case and trailing char + String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x + + int len = s.length(); + int byte_len = len / 2; + byte[] b = new byte[byte_len]; + + for (int i = 0; i < byte_len; i++) + { + // fixme: refine these repetitive subscript calcs. + int ch = i * 2; + + byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16); + byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16); + + b[i] = (byte) (b1 * 16 + b2); + } + + return (b); + } + + public static char[] convertToHexCharArray(byte[] from) + { + int length = from.length; + char[] result_buff = new char[length * 2 + 2]; + + result_buff[0] = '0'; + result_buff[1] = 'x'; + + int bite; + int dest = 2; + + for (int i = 0; i < length; i++) + { + bite = from[i]; + + if (bite < 0) + { + bite += 256; + } + + result_buff[dest++] = hex_chars[bite >> 4]; + result_buff[dest++] = hex_chars[bite & 0x0f]; + } + + return (result_buff); + } + + public static String convertToHexString(byte[] from) + { + return (new String(convertToHexCharArray(from))); + } + + public static String convertToHexString(ByteBuffer bb) + { + int size = bb.limit(); + + byte[] from = new byte[size]; + + // Is this not the same. + //bb.get(from, 0, size); + for (int i = 0; i < size; i++) + { + from[i] = bb.get(i); + } + + return (new String(convertToHexCharArray(from))); + } + + public static void main(String[] args) + { + for (int i = 0; i < args.length; i++) + { + String selector = args[i]; + + System.err.println("Parsing <" + selector + ">..."); + + try + { + System.err.println(createFieldTableFromMessageSelector(selector)); + } + catch (IllegalArgumentException e) + { + System.err.println(e); + } + } + } + + private static char hex_chars[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java new file mode 100644 index 0000000000..30f41205dd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -0,0 +1,319 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.util.*; + +/** + * From the protocol document: + * field-table = short-integer *field-value-pair + * field-value-pair = field-name field-value + * field-name = short-string + * field-value = 'S' long-string + * / 'I' long-integer + * / 'D' decimal-value + * / 'T' long-integer + * decimal-value = decimals long-integer + * decimals = OCTET + */ +public class FieldTable extends LinkedHashMap +{ + private static final Logger _logger = Logger.getLogger(FieldTable.class); + private long _encodedSize = 0; + + public FieldTable() + { + super(); + } + + /** + * Construct a new field table. + * + * @param buffer the buffer from which to read data. The length byte must be read already + * @param length the length of the field table. Must be > 0. + * @throws AMQFrameDecodingException if there is an error decoding the table + */ + public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException + { + super(); + final boolean debug = _logger.isDebugEnabled(); + assert length > 0; + _encodedSize = length; + int sizeRead = 0; + while (sizeRead < _encodedSize) + { + int sizeRemaining = buffer.remaining(); + final String key = EncodingUtils.readShortString(buffer); + // TODO: use proper charset decoder + byte iType = buffer.get(); + final char type = (char) iType; + Object value; + switch (type) + { + case 'S': + value = EncodingUtils.readLongString(buffer); + break; + case 'I': + value = new Long(buffer.getUnsignedInt()); + break; + default: + String msg = "Field '" + key + "' - unsupported field table type: " + type; + //some extra debug information... + msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; + throw new AMQFrameDecodingException(msg); + } + sizeRead += (sizeRemaining - buffer.remaining()); + + if (debug) + { + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Read type '" + type + "', key '" + key + "', value '" + value + "' (now read " + sizeRead + " of " + length + " encoded bytes)..."); + } + + // we deliberately want to call put in the parent class since we do + // not need to do the size calculations + super.put(key, value); + } + + if (debug) + { + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); + } + } + + public void writeToBuffer(ByteBuffer buffer) + { + final boolean debug = _logger.isDebugEnabled(); + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Writing encoded size of " + _encodedSize + "..."); + } + + // write out the total length, which we have kept up to date as data is added + EncodingUtils.writeUnsignedInteger(buffer, _encodedSize); + final Iterator it = this.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry me = (Map.Entry) it.next(); + String key = (String) me.getKey(); + + EncodingUtils.writeShortStringBytes(buffer, key); + Object value = me.getValue(); + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Writing key '" + key + "' of type " + value.getClass() + ", value '" + value + "'..."); + } + + if (value instanceof byte[]) + { + buffer.put((byte) 'S'); + EncodingUtils.writeLongstr(buffer, (byte[]) value); + } + else if (value instanceof String) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + else if (value instanceof Long) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'I'); + EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue()); + } + else + { + // Should never get here + throw new IllegalArgumentException("Key '" + key + "': Unsupported type in field table, type: " + ((value == null) ? "null-object" : value.getClass())); + } + } + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Done."); + } + } + + public byte[] getDataAsBytes() + { + final ByteBuffer buffer = ByteBuffer.allocate((int) _encodedSize); // XXX: Is cast a problem? + final Iterator it = this.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry me = (Map.Entry) it.next(); + String key = (String) me.getKey(); + EncodingUtils.writeShortStringBytes(buffer, key); + Object value = me.getValue(); + if (value instanceof byte[]) + { + buffer.put((byte) 'S'); + EncodingUtils.writeLongstr(buffer, (byte[]) value); + } + else if (value instanceof String) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + else if (value instanceof char[]) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (char[]) value); + } + else if (value instanceof Long || value instanceof Integer) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'I'); + EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue()); + } + else + { + // Should never get here + assert false; + } + } + final byte[] result = new byte[(int) _encodedSize]; + buffer.flip(); + buffer.get(result); + buffer.release(); + return result; + } + + public Object put(Object key, Object value) + { + final boolean debug = _logger.isDebugEnabled(); + + if (key == null) + { + throw new IllegalArgumentException("All keys must be Strings - was passed: null"); + } + else if (!(key instanceof String)) + { + throw new IllegalArgumentException("All keys must be Strings - was passed: " + key.getClass()); + } + + Object existing; + + if ((existing = super.remove(key)) != null) + { + if (debug) + { + _logger.debug("Found duplicate of key '" + key + "', previous value '" + existing + "' (" + existing.getClass() + "), to be replaced by '" + value + "', (" + value.getClass() + ") - stack trace of source of duplicate follows...", new Throwable().fillInStackTrace()); + } + + // If we are in effect deleting the value (see comment on null values being deleted + // below) then we also need to remove the name from the encoding length. + if (value == null) + { + _encodedSize -= EncodingUtils.encodedShortStringLength((String) key); + } + + // FIXME: Should be able to short-cut this process if the old and new values are + // the same object and/or type and size... + _encodedSize -= getEncodingSize(existing); + } + else + { + if (value != null) + { + _encodedSize += EncodingUtils.encodedShortStringLength((String) key); + } + } + + // For now: Setting a null value is the equivalent of deleting it. + // This is ambiguous in the JMS spec and needs thrashing out and potentially + // testing against other implementations. + if (value != null) + { + _encodedSize += getEncodingSize(value); + } + + return super.put(key, value); + } + + public Object remove(Object key) + { + if (super.containsKey(key)) + { + final Object value = super.remove(key); + _encodedSize -= EncodingUtils.encodedShortStringLength((String) key); + + // This check is, for now, unnecessary (we don't store null values). + if (value != null) + { + _encodedSize -= getEncodingSize(value); + } + + return value; + } + else + { + return null; + } + } + + /** + * @return unsigned integer + */ + public long getEncodedSize() + { + return _encodedSize; + } + + /** + * @return integer + */ + private static int getEncodingSize(Object value) + { + int encodingSize; + + // the extra byte if for the type indicator that is written out + if (value instanceof String) + { + encodingSize = 1 + EncodingUtils.encodedLongStringLength((String) value); + } + else if (value instanceof char[]) + { + encodingSize = 1 + EncodingUtils.encodedLongStringLength((char[]) value); + } + else if (value instanceof Integer) + { + encodingSize = 1 + 4; + } + else if (value instanceof Long) + { + encodingSize = 1 + 4; + } + else + { + throw new IllegalArgumentException("Unsupported type in field table: " + value.getClass()); + } + + return encodingSize; + } + + public Enumeration keys() + { + return new FieldTableKeyEnumeration(this); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java new file mode 100644 index 0000000000..2bc890ebbc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.Enumeration; +import java.util.Iterator; + + +public class FieldTableKeyEnumeration implements Enumeration +{ + protected FieldTable _table; + protected Iterator _iterator; + + public FieldTableKeyEnumeration(FieldTable ft) + { + _table = ft; + _iterator = ft.keySet().iterator(); + } + + public boolean hasMoreElements() + { + return _iterator.hasNext(); + } + + public Object nextElement() + { + return _iterator.next(); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java new file mode 100644 index 0000000000..4dda794427 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBody extends AMQBody +{ + public static final byte TYPE = 8; + public static AMQFrame FRAME = new HeartbeatBody().toFrame(); + + protected byte getType() + { + return TYPE; + } + + protected int getSize() + { + return 0;//heartbeats we generate have no payload + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public AMQFrame toFrame() + { + return new AMQFrame(0, this); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java new file mode 100644 index 0000000000..1d63f3827b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBodyFactory implements BodyFactory +{ + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new HeartbeatBody(); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java new file mode 100644 index 0000000000..e500a683dc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -0,0 +1,176 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.AMQException; + +public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock +{ + public char[] header = new char[]{'A','M','Q','P'}; + // TODO: generate these constants automatically from the xml protocol spec file + + private static byte CURRENT_PROTOCOL_CLASS = 1; + private static final int CURRENT_PROTOCOL_INSTANCE = 1; + + public byte protocolClass = CURRENT_PROTOCOL_CLASS; + public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE; + public byte protocolMajor; + public byte protocolMinor; + +// public ProtocolInitiation() {} + + public ProtocolInitiation(byte major, byte minor) + { + protocolMajor = major; + protocolMinor = minor; + } + + public long getSize() + { + return 4 + 1 + 1 + 1 + 1; + } + + public void writePayload(ByteBuffer buffer) + { + for (int i = 0; i < header.length; i++) + { + buffer.put((byte) header[i]); + } + buffer.put(protocolClass); + buffer.put(protocolInstance); + buffer.put(protocolMajor); + buffer.put(protocolMinor); + } + + public void populateFromBuffer(ByteBuffer buffer) throws AMQException + { + throw new AMQException("Method not implemented"); + } + + public boolean equals(Object o) + { + if (!(o instanceof ProtocolInitiation)) + { + return false; + } + + ProtocolInitiation pi = (ProtocolInitiation) o; + if (pi.header == null) + { + return false; + } + + if (header.length != pi.header.length) + { + return false; + } + + for (int i = 0; i < header.length; i++) + { + if (header[i] != pi.header[i]) + { + return false; + } + } + + return (protocolClass == pi.protocolClass && + protocolInstance == pi.protocolInstance && + protocolMajor == pi.protocolMajor && + protocolMinor == pi.protocolMinor); + } + + public static class Decoder //implements MessageDecoder + { + /** + * + * @param session + * @param in + * @return true if we have enough data to decode the PI frame fully, false if more + * data is required + */ + public boolean decodable(IoSession session, ByteBuffer in) + { + return (in.remaining() >= 8); + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + byte[] theHeader = new byte[4]; + in.get(theHeader); + ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0); + pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]}; + String stringHeader = new String(pi.header); + if (!"AMQP".equals(stringHeader)) + { + throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader); + } + pi.protocolClass = in.get(); + pi.protocolInstance = in.get(); + pi.protocolMajor = in.get(); + pi.protocolMinor = in.get(); + out.write(pi); + } + } + + public void checkVersion(ProtocolVersionList pvl) throws AMQException + { + if (protocolClass != CURRENT_PROTOCOL_CLASS) + { + throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + + protocolClass); + } + if (protocolInstance != CURRENT_PROTOCOL_INSTANCE) + { + throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " + + protocolInstance); + } + /* + if (protocolMajor != CURRENT_PROTOCOL_VERSION_MAJOR) + { + throw new AMQProtocolVersionException("Protocol major version " + CURRENT_PROTOCOL_VERSION_MAJOR + + " was expected; received " + protocolMajor); + } + if (protocolMinor != CURRENT_PROTOCOL_VERSION_MINOR) + { + throw new AMQProtocolVersionException("Protocol minor version " + CURRENT_PROTOCOL_VERSION_MINOR + + " was expected; received " + protocolMinor); + } + */ + + /* Look through list of available protocol versions */ + boolean found = false; + for (int i=0; i<pvl.pv.length; i++) + { + if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor && + pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor) + { + found = true; + } + } + if (!found) + { + // TODO: add list of available versions in list to msg... + throw new AMQProtocolVersionException("Protocol version " + + protocolMajor + "." + protocolMinor + " not found in protocol version list."); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java new file mode 100644 index 0000000000..6bf86ffc2e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -0,0 +1,111 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoFilter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IdleStatus; + +/** + * Represents an operation on IoFilter. + */ +enum EventType +{ + OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION +} + +class Event +{ + private static final Logger _log = Logger.getLogger(Event.class); + + private final EventType type; + private final IoFilter.NextFilter nextFilter; + private final Object data; + + public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) + { + this.type = type; + this.nextFilter = nextFilter; + this.data = data; + if (type == EventType.EXCEPTION) + { + _log.error("Exception event constructed: " + data, (Exception) data); + } + } + + public Object getData() + { + return data; + } + + + public IoFilter.NextFilter getNextFilter() + { + return nextFilter; + } + + + public EventType getType() + { + return type; + } + + void process(IoSession session) + { + if (_log.isDebugEnabled()) + { + _log.debug("Processing " + this); + } + if (type == EventType.RECEIVED) + { + nextFilter.messageReceived(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.SENT) + { + nextFilter.messageSent(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.EXCEPTION) + { + nextFilter.exceptionCaught(session, (Throwable) data); + } + else if (type == EventType.IDLE) + { + nextFilter.sessionIdle(session, (IdleStatus) data); + } + else if (type == EventType.OPENED) + { + nextFilter.sessionOpened(session); + } + else if (type == EventType.WRITE) + { + nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + } + else if (type == EventType.CLOSED) + { + nextFilter.sessionClosed(session); + } + } + + public String toString() + { + return "Event: type " + type + ", data: " + data; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java new file mode 100644 index 0000000000..45a115bcd3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -0,0 +1,110 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoSession; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Holds events for a session that will be processed asynchronously by + * the thread pool in PoolingFilter. + */ +class Job implements Runnable +{ + private final int _maxEvents; + private final IoSession _session; + private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final AtomicBoolean _active = new AtomicBoolean(); + private final AtomicInteger _refCount = new AtomicInteger(); + private final JobCompletionHandler _completionHandler; + + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + { + _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + } + + void acquire() + { + _refCount.incrementAndGet(); + } + + void release() + { + _refCount.decrementAndGet(); + } + + boolean isReferenced() + { + return _refCount.get() > 0; + } + + void add(Event evt) + { + _eventQueue.add(evt); + } + + void processAll() + { + //limit the number of events processed in one run + for (int i = 0; i < _maxEvents; i++) + { + Event e = _eventQueue.poll(); + if (e == null) + { + break; + } + else + { + e.process(_session); + } + } + } + + boolean isComplete() + { + return _eventQueue.peek() == null; + } + + boolean activate() + { + return _active.compareAndSet(false, true); + } + + void deactivate() + { + _active.set(false); + } + + public void run() + { + processAll(); + deactivate(); + _completionHandler.completed(_session, this); + } + + + static interface JobCompletionHandler + { + public void completed(IoSession session, Job job); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java new file mode 100644 index 0000000000..ecb0bd5048 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -0,0 +1,183 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoSession; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler +{ + private static final Logger _logger = Logger.getLogger(PoolingFilter.class); + public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED)); + public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE)); + + private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); + private final ReferenceCountingExecutorService _poolReference; + private final Set<EventType> _asyncTypes; + + private final String _name; + private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name) + { + _poolReference = refCountingPool; + _asyncTypes = asyncTypes; + _name = name; + } + + private void fireEvent(IoSession session, Event event) + { + if (_asyncTypes.contains(event.getType())) + { + Job job = getJobForSession(session); + job.acquire(); //prevents this job being removed from _jobs + job.add(event); + if (job.activate()) + { + _poolReference.getPool().execute(job); + } + } + else + { + event.process(session); + } + } + + private Job getJobForSession(IoSession session) + { + Job job = _jobs.get(session); + return job == null ? createJobForSession(session) : job; + } + + private Job createJobForSession(IoSession session) + { + return addJobForSession(session, new Job(session, this, _maxEvents)); + } + + private Job addJobForSession(IoSession session, Job job) + { + //atomic so ensures all threads agree on the same job + Job existing = _jobs.putIfAbsent(session, job); + return existing == null ? job : existing; + } + + //Job.JobCompletionHandler + public void completed(IoSession session, Job job) + { + if (job.isComplete()) + { + job.release(); + if (!job.isReferenced()) + { + _jobs.remove(session); + } + } + else + { + if (job.activate()) + { + _poolReference.getPool().execute(job); + } + } + } + + //IoFilter methods that are processed by threads on the pool + + public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + } + + public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + } + + public void sessionIdle(NextFilter nextFilter, IoSession session, + IdleStatus status) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + } + + public void exceptionCaught(NextFilter nextFilter, IoSession session, + Throwable cause) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + } + + public void messageReceived(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + } + + public void messageSent(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + } + + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + } + + //IoFilter methods that are processed on current thread (NOT on pooled thread) + + public void filterClose(NextFilter nextFilter, IoSession session) throws Exception + { + nextFilter.filterClose(session); + } + + public void sessionCreated(NextFilter nextFilter, IoSession session) + { + nextFilter.sessionCreated(session); + } + + public String toString() + { + return _name; + } + + // LifeCycle methods + + public void init() + { + _logger.info("Init called on PoolingFilter " + toString()); + // called when the filter is initialised in the chain. If the reference count is + // zero this acquire will initialise the pool + _poolReference.acquireExecutorService(); + } + + public void destroy() + { + _logger.info("Destroy called on PoolingFilter " + toString()); + // when the reference count gets to zero we release the executor service + _poolReference.releaseExecutorService(); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java new file mode 100644 index 0000000000..bed9ac0b99 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.filter.ReferenceCountingIoFilter; +import org.apache.mina.common.ThreadModel; + +public class ReadWriteThreadModel implements ThreadModel +{ + public void buildFilterChain(IoFilterChain chain) throws Exception + { + ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, + "AsynchronousReadFilter"); + PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, + "AsynchronousWriteFilter"); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java new file mode 100644 index 0000000000..f048a12b90 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * We share the executor service among several PoolingFilters. This class reference counts + * how many filter chains are using the executor service and destroys the service, thus + * freeing up its threads, when the count reaches zero. It recreates the service when + * the count is incremented. + * + * This is particularly important on the client where failing to destroy the executor + * service prevents the JVM from shutting down due to the existence of non-daemon threads. + * + */ +public class ReferenceCountingExecutorService +{ + private static final int MINIMUM_POOL_SIZE = 4; + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE); + + /** + * We need to be able to check the current reference count and if necessary + * create the executor service atomically. + */ + private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); + + private final Object _lock = new Object(); + + private ExecutorService _pool; + + private int _refCount = 0; + + private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + + public static ReferenceCountingExecutorService getInstance() + { + return _instance; + } + + private ReferenceCountingExecutorService() + { + } + + ExecutorService acquireExecutorService() + { + synchronized (_lock) + { + if (_refCount++ == 0) + { + _pool = Executors.newFixedThreadPool(_poolSize); + } + return _pool; + } + } + + void releaseExecutorService() + { + synchronized (_lock) + { + if (--_refCount == 0) + { + _pool.shutdownNow(); + } + } + } + + /** + * The filters that use the executor service should call this method to get access + * to the service. Note that this method does not alter the reference count. + * + * @return the underlying executor service + */ + public ExecutorService getPool() + { + return _pool; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java new file mode 100644 index 0000000000..0716104688 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.protocol; + +import java.util.Map; +import java.util.HashMap; + +public final class AMQConstant +{ + private int _code; + + private String _name; + + private static Map _codeMap = new HashMap(); + + private AMQConstant(int code, String name, boolean map) + { + _code = code; + _name = name; + if (map) + { + _codeMap.put(new Integer(code), this); + } + } + + public String toString() + { + return _code + ": " + _name; + } + + public int getCode() + { + return _code; + } + + public String getName() + { + return _name; + } + + public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); + + public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true); + + public static final AMQConstant REPLY_SUCCESS = new AMQConstant(200, "reply success", true); + + public static final AMQConstant NOT_DELIVERED = new AMQConstant(310, "not delivered", true); + + public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true); + + public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true); + + public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true); + + public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); + + public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true); + + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); + + public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); + + public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); + + public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); + + public static final AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true); + + public static final AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true); + + public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); + + public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + + public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); + + public static final AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true); + + public static AMQConstant getConstant(int code) + { + AMQConstant c = (AMQConstant) _codeMap.get(new Integer(code)); + if (c == null) + { + c = new AMQConstant(code, "unknown code", false); + } + return c; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java new file mode 100644 index 0000000000..c066fd0370 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java @@ -0,0 +1,156 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.security.KeyStore; + +/** + * Factory to create a bogus SSLContext. This means that it is easy to test SSL but this + * cannot be used in a production environment. + * <p/> + * This is based on the sample that comes with MINA, written by Trustin Lee + */ +public class BogusSSLContextFactory +{ + /** + * Protocol to use. + */ + private static final String PROTOCOL = "TLS"; + + /** + * Bougus Server certificate keystore file name. + */ + private static final String BOGUS_KEYSTORE = "qpid.cert"; + + // NOTE: The keystore was generated using keytool: + // keytool -genkey -alias qpid -keysize 512 -validity 3650 + // -keyalg RSA -dname "CN=amqp.org" -keypass qpidpw + // -storepass qpidpw -keystore qpid.cert + + private static final char[] BOGUS_KEYSTORE_PASSWORD = {'q', 'p', 'i', 'd', 'p', 'w'}; + + private static SSLContext serverInstance = null; + + private static SSLContext clientInstance = null; + + /** + * Get SSLContext singleton. + * + * @return SSLContext + * @throws java.security.GeneralSecurityException + */ + public static SSLContext getInstance(boolean server) + throws GeneralSecurityException + { + SSLContext retInstance; + if (server) + { + // FIXME: looks like double-checking locking + if (serverInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (serverInstance == null) + { + try + { + serverInstance = createBougusServerSSLContext(); + } + catch (Exception ioe) + { + throw new GeneralSecurityException( + "Can't create Server SSLContext:" + ioe); + } + } + } + } + retInstance = serverInstance; + } + else + { + // FIXME: looks like double-checking locking + if (clientInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (clientInstance == null) + { + clientInstance = createBougusClientSSLContext(); + } + } + } + retInstance = clientInstance; + } + return retInstance; + } + + private static SSLContext createBougusServerSSLContext() + throws GeneralSecurityException, IOException + { + // Create keystore + KeyStore ks = KeyStore.getInstance("JKS"); + InputStream in = null; + try + { + in = BogusSSLContextFactory.class.getResourceAsStream(BOGUS_KEYSTORE); + if (in == null) + { + throw new IOException("Unable to load keystore resource: " + BOGUS_KEYSTORE); + } + ks.load(in, BOGUS_KEYSTORE_PASSWORD); + } + finally + { + if (in != null) + { + //noinspection EmptyCatchBlock + try + { + in.close(); + } + catch (IOException ignored) + { + } + } + } + + // Set up key manager factory to use our key store + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, BOGUS_KEYSTORE_PASSWORD); + + // Initialize the SSLContext to work with our key managers. + SSLContext sslContext = SSLContext.getInstance(PROTOCOL); + sslContext.init(kmf.getKeyManagers(), BogusTrustManagerFactory.X509_MANAGERS, null); + + return sslContext; + } + + private static SSLContext createBougusClientSSLContext() + throws GeneralSecurityException + { + SSLContext context = SSLContext.getInstance(PROTOCOL); + context.init(null, BogusTrustManagerFactory.X509_MANAGERS, null); + return context; + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java new file mode 100644 index 0000000000..8a71e3d7c8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactorySpi; +import javax.net.ssl.X509TrustManager; +import java.security.InvalidAlgorithmParameterException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +/** + * Bogus trust manager factory. Used to make testing SSL simpler - i.e no need to + * mess about with keystores. + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +class BogusTrustManagerFactory extends TrustManagerFactorySpi +{ + + static final X509TrustManager X509 = new X509TrustManager() + { + public void checkClientTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public X509Certificate[] getAcceptedIssuers() + { + return new X509Certificate[ 0 ]; + } + }; + + static final TrustManager[] X509_MANAGERS = new TrustManager[]{X509}; + + public BogusTrustManagerFactory() + { + } + + protected TrustManager[] engineGetTrustManagers() + { + return X509_MANAGERS; + } + + protected void engineInit(KeyStore keystore) throws KeyStoreException + { + // noop + } + + protected void engineInit( + ManagerFactoryParameters managerFactoryParameters) + throws InvalidAlgorithmParameterException + { + // noop + } +} diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java new file mode 100644 index 0000000000..f2d5396637 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ServerSocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.security.GeneralSecurityException; + +/** + * Simple Server Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes) + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +public class SSLServerSocketFactory extends javax.net.ServerSocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ServerSocketFactory sslFactory = null; + + private static ServerSocketFactory factory = null; + + public SSLServerSocketFactory() + { + super(); + } + + public ServerSocket createServerSocket(int port) throws IOException + { + return new ServerSocket(port); + } + + public ServerSocket createServerSocket(int port, int backlog) + throws IOException + { + return new ServerSocket(port, backlog); + } + + public ServerSocket createServerSocket(int port, int backlog, + InetAddress ifAddress) + throws IOException + { + return new ServerSocket(port, backlog, ifAddress); + } + + public static javax.net.ServerSocketFactory getServerSocketFactory() + throws IOException + { + if (isSslEnabled()) + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(true) + .getServerSocketFactory(); + } + catch (GeneralSecurityException e) + { + IOException ioe = new IOException( + "could not create SSL socket"); + ioe.initCause(e); + throw ioe; + } + } + return sslFactory; + } + else + { + if (factory == null) + { + factory = new SSLServerSocketFactory(); + } + return factory; + } + + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java new file mode 100644 index 0000000000..31dccb593e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java @@ -0,0 +1,135 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.GeneralSecurityException; + +/** + * Simple Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes). + * <p/> + * This is based on an example that comes with MINA, written by Trustin Lee. + */ +public class SSLSocketFactory extends SocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ssl.SSLSocketFactory sslFactory = null; + + private static javax.net.SocketFactory factory = null; + + public SSLSocketFactory() + { + super(); + } + + public Socket createSocket(String arg1, int arg2) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(String arg1, int arg2, InetAddress arg3, + int arg4) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public Socket createSocket(InetAddress arg1, int arg2) + throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(InetAddress arg1, int arg2, InetAddress arg3, + int arg4) throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public static javax.net.SocketFactory getSocketFactory() + { + if (factory == null) + { + factory = new SSLSocketFactory(); + } + return factory; + } + + private javax.net.ssl.SSLSocketFactory getSSLFactory() + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(false) + .getSocketFactory(); + } + catch (GeneralSecurityException e) + { + throw new RuntimeException("could not create SSL socket", e); + } + } + return sslFactory; + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java new file mode 100644 index 0000000000..5ea1a55f2a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -0,0 +1,260 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.exchange.ExchangeDefaults; + +import java.util.HashMap; +import java.net.URI; +import java.net.URISyntaxException; + +public class AMQBindingURL implements BindingURL +{ + String _url; + String _exchangeClass; + String _exchangeName; + String _destinationName; + String _queueName; + private HashMap<String, String> _options; + + + public AMQBindingURL(String url) throws URLSyntaxException + { + //format: + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + + _url = url; + _options = new HashMap<String, String>(); + + parseBindingURL(); + } + + private void parseBindingURL() throws URLSyntaxException + { + try + { + URI connection = new URI(_url); + + String exchangeClass = connection.getScheme(); + + if (exchangeClass == null) + { + _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + + ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url; + //URLHelper.parseError(-1, "Exchange Class not specified.", _url); + parseBindingURL(); + return; + } + else + { + setExchangeClass(exchangeClass); + } + + String exchangeName = connection.getHost(); + + if (exchangeName == null) + { + URLHelper.parseError(-1, "Exchange Name not specified.", _url); + } + else + { + setExchangeName(exchangeName); + } + + if (connection.getPath() == null || + connection.getPath().equals("")) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination or Queue requried", _url); + } + else + { + int slash = connection.getPath().indexOf("/", 1); + if (slash == -1) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination requried", _url); + } + else + { + String path = connection.getPath(); + setDestinationName(path.substring(1, slash)); + + setQueueName(path.substring(slash + 1)); + + } + } + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + + } + } + + private void processOptions() + { + //this is where we would parse any options that needed more than just storage. + } + + public String getURL() + { + return _url; + } + + public String getExchangeClass() + { + return _exchangeClass; + } + + public void setExchangeClass(String exchangeClass) + { + _exchangeClass = exchangeClass; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public void setExchangeName(String name) + { + _exchangeName = name; + + if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) + { + setOption(BindingURL.OPTION_EXCLUSIVE, "true"); + } + } + + public String getDestinationName() + { + return _destinationName; + } + + public void setDestinationName(String name) + { + _destinationName = name; + } + + public String getQueueName() + { + if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) + { + if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + return getDestinationName(); + } + } + else + { + return getDestinationName(); + } + } + else + { + return _queueName; + } + } + + public void setQueueName(String name) + { + _queueName = name; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public boolean containsOption(String key) + { + return _options.containsKey(key); + } + + public String getRoutingKey() + { + if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return getQueueName(); + } + + if (containsOption(BindingURL.OPTION_ROUTING_KEY)) + { + return getOption(OPTION_ROUTING_KEY); + } + + return getDestinationName(); + } + + public void setRoutingKey(String key) + { + setOption(OPTION_ROUTING_KEY, key); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); + sb.append('/'); + sb.append(_destinationName); + sb.append('/'); + sb.append(_queueName); + + sb.append(URLHelper.printOptions(_options)); + return sb.toString(); + } + + public static void main(String args[]) throws URLSyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?option='value',option2='value2'"; + + AMQBindingURL dest = new AMQBindingURL(url); + + System.out.println(url); + System.out.println(dest); + + } + +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java new file mode 100644 index 0000000000..77802b0e17 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.util.List; + +/* + Binding URL format: + <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* +*/ +public interface BindingURL +{ + public static final String OPTION_EXCLUSIVE = "exclusive"; + public static final String OPTION_AUTODELETE = "autodelete"; + public static final String OPTION_DURABLE = "durable"; + public static final String OPTION_CLIENTID = "clientid"; + public static final String OPTION_SUBSCRIPTION = "subscription"; + public static final String OPTION_ROUTING_KEY = "routingkey"; + + + String getURL(); + + String getExchangeClass(); + + void setExchangeClass(String exchangeClass); + + String getExchangeName(); + + void setExchangeName(String name); + + String getDestinationName(); + + void setDestinationName(String name); + + String getQueueName(); + + void setQueueName(String name); + + String getOption(String key); + + void setOption(String key, String value); + + boolean containsOption(String key); + + String getRoutingKey(); + + void setRoutingKey(String key); + + String toString(); +} diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java new file mode 100644 index 0000000000..959735d438 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -0,0 +1,173 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.util.HashMap; + +public class URLHelper +{ + public static char DEFAULT_OPTION_SEPERATOR = '&'; + public static char ALTERNATIVE_OPTION_SEPARATOR = ','; + public static char BROKER_SEPARATOR = ';'; + + public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException + { + //options looks like this + //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + + if (options == null || options.indexOf('=') == -1) + { + return; + } + + int optionIndex = options.indexOf('='); + + String option = options.substring(0, optionIndex); + + int length = options.length(); + + int nestedQuotes = 0; + + // to store index of final "'" + int valueIndex = optionIndex; + + //Walk remainder of url. + while (nestedQuotes > 0 || valueIndex < length) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options.charAt(valueIndex) == '\'') + { + if (valueIndex + 1 < options.length()) + { + if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR || + options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR || + options.charAt(valueIndex + 1) == BROKER_SEPARATOR || + options.charAt(valueIndex + 1) == '\'') + { + nestedQuotes--; +// System.out.println( +// options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + if (nestedQuotes == 0) + { + //We've found the value of an option + break; + } + } + else + { + nestedQuotes++; +// System.out.println( +// options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options.charAt(valueIndex) == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + + if (nestedQuotes != 0 || valueIndex < (optionIndex + 2)) + { + int sepIndex = 0; + + //Try and identify illegal separator character + if (nestedQuotes > 1) + { + for (int i = 0; i < nestedQuotes; i++) + { + sepIndex = options.indexOf('\'', sepIndex); + sepIndex++; + } + } + + if (sepIndex >= options.length() || sepIndex == 0) + { + parseError(valueIndex, "Unterminated option", options); + } + else + { + parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + options.charAt(sepIndex) + "'", options); + } + } + + // optionIndex +2 to skip "='" + String value = options.substring(optionIndex + 2, valueIndex); + + optionMap.put(option, value); + + if (valueIndex < (options.length() - 1)) + { + //Recurse to get remaining options + parseOptions(optionMap, options.substring(valueIndex + 2)); + } + } + + + public static void parseError(int index, String error, String url) throws URLSyntaxException + { + parseError(index, 1, error, url); + } + + public static void parseError(int index, int length, String error, String url) throws URLSyntaxException + { + throw new URLSyntaxException(url, error, index, length); + } + + public static String printOptions(HashMap<String, String> options) + { + if (options.isEmpty()) + { + return ""; + } + else + { + StringBuffer sb = new StringBuffer(); + sb.append('?'); + for (String key : options.keySet()) + { + sb.append(key); + + sb.append("='"); + + sb.append(options.get(key)); + + sb.append("'"); + sb.append(DEFAULT_OPTION_SEPERATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return sb.toString(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java new file mode 100644 index 0000000000..b454069826 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.net.URISyntaxException; + +public class URLSyntaxException extends URISyntaxException +{ + private int _length; + + public URLSyntaxException(String url, String error, int index, int length) + { + super(url, error, index); + + _length = length; + } + + private static String getPositionString(int index, int length) + { + StringBuffer sb = new StringBuffer(index + 1); + + for (int i = 0; i < index; i++) + { + sb.append(" "); + } + + if (length > -1) + { + for (int i = 0; i < length; i++) + { + sb.append('^'); + } + } + + return sb.toString(); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(getReason()); + + if (getIndex() > -1) + { + if (_length != -1) + { + sb.append(" between indicies "); + sb.append(getIndex()); + sb.append(" and "); + sb.append(_length); + } + else + { + sb.append(" at index "); + sb.append(getIndex()); + } + } + + sb.append(" "); + if (getIndex() != -1) + { + sb.append("\n"); + } + + sb.append(getInput()); + + if (getIndex() != -1) + { + sb.append("\n"); + sb.append(getPositionString(getIndex(), _length)); + } + + return sb.toString(); + } + + +} |