summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-11-23 23:12:28 +0000
committerRobert Greig <rgreig@apache.org>2006-11-23 23:12:28 +0000
commit4c8f9badcd32fd9a519184928f90e8a685412eaa (patch)
treeb4485b039e2080520fa2767b054b0de2d9d3b096
parent4e7b444e740f8f2bf55dd6a1684932388edca3e3 (diff)
downloadqpid-python-4c8f9badcd32fd9a519184928f90e8a685412eaa.tar.gz
Start of merge from trunk - some manual restructuring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@478706 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java46
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQException.java74
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java41
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java50
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java96
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java38
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/Configured.java41
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java62
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java153
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java33
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java113
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java62
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java73
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java45
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java87
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java43
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java28
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java589
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java28
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java100
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java83
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java44
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java112
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java47
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java55
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java546
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java319
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java44
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java54
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java28
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java176
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Event.java111
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java110
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java183
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java37
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java95
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java105
-rw-r--r--java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java156
-rw-r--r--java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java79
-rw-r--r--java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java105
-rw-r--r--java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java135
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java260
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURL.java65
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLHelper.java173
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java94
57 files changed, 5412 insertions, 0 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
new file mode 100644
index 0000000000..694fe75cda
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+/**
+ * AMQ channel closed exception.
+ */
+public class AMQChannelClosedException extends AMQException
+{
+ public AMQChannelClosedException(int errorCode, String msg)
+ {
+ super(errorCode, msg);
+ }
+}
+
+
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
new file mode 100644
index 0000000000..677a4938a0
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQFrame;
+
+public class AMQChannelException extends AMQException
+{
+ private final int _classId;
+ private final int _methodId;
+
+ public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t)
+ {
+ super(errorCode, msg, t);
+ _classId = classId;
+ _methodId = methodId;
+ }
+
+ public AMQChannelException(int errorCode, String msg, int classId, int methodId)
+ {
+ super(errorCode, msg);
+ _classId = classId;
+ _methodId = methodId;
+ }
+
+ public AMQFrame getCloseFrame(int channel)
+ {
+ return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
new file mode 100644
index 0000000000..dcf393eb65
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+/**
+ * AMQ channel closed exception.
+ */
+public class AMQConnectionClosedException extends AMQException
+{
+ public AMQConnectionClosedException(int errorCode, String msg)
+ {
+ super(errorCode, msg);
+ }
+}
+
+
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
new file mode 100644
index 0000000000..171af23500
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid;
+
+public class AMQConnectionException extends AMQException
+{
+ public AMQConnectionException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
new file mode 100644
index 0000000000..616a95bd1b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+/**
+ * AMQ disconnected exception.
+ */
+public class AMQDisconnectedException extends AMQException
+{
+ public AMQDisconnectedException(String msg)
+ {
+ super(msg);
+ }
+}
+
+
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java
new file mode 100644
index 0000000000..423cbf8975
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Generic AMQ exception.
+ */
+public class AMQException extends Exception
+{
+ private int _errorCode;
+
+ public AMQException(String message)
+ {
+ super(message);
+ }
+
+ public AMQException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public AMQException(int errorCode, String msg, Throwable t)
+ {
+ super(msg + " [error code " + errorCode + ']', t);
+ _errorCode = errorCode;
+ }
+
+ public AMQException(int errorCode, String msg)
+ {
+ super(msg + " [error code " + errorCode + ']');
+ _errorCode = errorCode;
+ }
+
+ public AMQException(Logger logger, String msg, Throwable t)
+ {
+ this(msg, t);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQException(Logger logger, String msg)
+ {
+ this(msg);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQException(Logger logger, int errorCode, String msg)
+ {
+ this(errorCode, msg);
+ logger.error(getMessage(), this);
+ }
+
+ public int getErrorCode()
+ {
+ return _errorCode;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
new file mode 100644
index 0000000000..9424fe8eb1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+/**
+ * Generic AMQ exception.
+ */
+public class AMQUndeliveredException extends AMQException
+{
+ private Object _bounced;
+
+ public AMQUndeliveredException(int errorCode, String msg, Object bounced)
+ {
+ super(errorCode, msg);
+
+ _bounced = bounced;
+ }
+
+ public Object getUndeliveredMessage()
+ {
+ return _bounced;
+ }
+
+}
+
+
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
new file mode 100644
index 0000000000..5565958d67
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+public class AMQUnresolvedAddressException extends AMQException
+{
+ String _broker;
+
+ public AMQUnresolvedAddressException(String message, String broker)
+ {
+ super(message);
+ _broker = broker;
+ }
+
+ public String toString()
+ {
+ return super.toString() + " Broker, \"" + _broker +"\"";
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
new file mode 100644
index 0000000000..c62befce6b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+public class AMQCodecFactory implements ProtocolCodecFactory
+{
+ private AMQEncoder _encoder = new AMQEncoder();
+
+ private AMQDecoder _frameDecoder;
+
+ /**
+ * @param expectProtocolInitiation true if the first frame received is going to be
+ * a protocol initiation frame, false if it is going to be a standard AMQ data block.
+ * The former case is used for the broker, which always expects to received the
+ * protocol initiation first from a newly connected client.
+ */
+ public AMQCodecFactory(boolean expectProtocolInitiation)
+ {
+ _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+ }
+
+ public ProtocolEncoder getEncoder()
+ {
+ return _encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return _frameDecoder;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
new file mode 100644
index 0000000000..594ae11233
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.ProtocolInitiation;
+
+/**
+ * There is one instance of this class per session. Any changes or configuration done
+ * at run time to the encoders or decoders only affects decoding/encoding of the
+ * protocol session data to which is it bound.
+ *
+ */
+public class AMQDecoder extends CumulativeProtocolDecoder
+{
+ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+
+ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
+
+ private boolean _expectProtocolInitiation;
+
+ public AMQDecoder(boolean expectProtocolInitiation)
+ {
+ _expectProtocolInitiation = expectProtocolInitiation;
+ }
+
+ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ if (_expectProtocolInitiation)
+ {
+ return doDecodePI(session, in, out);
+ }
+ else
+ {
+ return doDecodeDataBlock(session, in, out);
+ }
+ }
+
+ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ int pos = in.position();
+ boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ in.position(pos);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _dataBlockDecoder.decode(session, in, out);
+ return true;
+ }
+ }
+
+ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean enoughData = _piDecoder.decodable(session, in);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _piDecoder.decode(session, in, out);
+ return true;
+ }
+ }
+
+ public void setExpectProtocolInitiation(boolean expectProtocolInitiation)
+ {
+ _expectProtocolInitiation = expectProtocolInitiation;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java
new file mode 100644
index 0000000000..7d5e8182a6
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.framing.AMQDataBlockEncoder;
+
+public class AMQEncoder implements ProtocolEncoder
+{
+ private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ _dataBlockEncoder.encode(session, message, out);
+ }
+
+ public void dispose(IoSession session) throws Exception
+ {
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Configured.java b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
new file mode 100644
index 0000000000..cb5e8aff1d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a field as being "configured" externally.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface Configured
+{
+ /**
+ * The Commons Configuration path to the configuration element
+ */
+ String path();
+
+ /**
+ * The default value to use should the path not be found in the configuration source
+ */
+ String defaultValue();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
new file mode 100644
index 0000000000..f148ffc0b7
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+/**
+ * Indicates an error parsing a property expansion.
+ */
+public class PropertyException extends AMQException
+{
+ public PropertyException(String message)
+ {
+ super(message);
+ }
+
+ public PropertyException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public PropertyException(int errorCode, String msg, Throwable t)
+ {
+ super(errorCode, msg, t);
+ }
+
+ public PropertyException(int errorCode, String msg)
+ {
+ super(errorCode, msg);
+ }
+
+ public PropertyException(Logger logger, String msg, Throwable t)
+ {
+ super(logger, msg, t);
+ }
+
+ public PropertyException(Logger logger, String msg)
+ {
+ super(logger, msg);
+ }
+
+ public PropertyException(Logger logger, int errorCode, String msg)
+ {
+ super(logger, errorCode, msg);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
new file mode 100644
index 0000000000..bd4000d3c4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Based on code in Apache Ant, this utility class handles property expansion. This
+ * is most useful in config files and so on.
+ */
+public class PropertyUtils
+{
+ /**
+ * Replaces <code>${xxx}</code> style constructions in the given value
+ * with the string value of the corresponding data types. Replaces only system
+ * properties
+ *
+ * @param value The string to be scanned for property references.
+ * May be <code>null</code>, in which case this
+ * method returns immediately with no effect.
+ * @return the original string with the properties replaced, or
+ * <code>null</code> if the original string is <code>null</code>.
+ * @throws PropertyException if the string contains an opening
+ * <code>${</code> without a closing
+ * <code>}</code>
+ */
+ public static String replaceProperties(String value) throws PropertyException
+ {
+ if (value == null)
+ {
+ return null;
+ }
+
+ ArrayList<String> fragments = new ArrayList<String>();
+ ArrayList<String> propertyRefs = new ArrayList<String>();
+ parsePropertyString(value, fragments, propertyRefs);
+
+ StringBuffer sb = new StringBuffer();
+ Iterator j = propertyRefs.iterator();
+
+ for (String fragment : fragments)
+ {
+ if (fragment == null)
+ {
+ String propertyName = (String) j.next();
+
+ // try to get it from the project or keys
+ // Backward compatibility
+ String replacement = System.getProperty(propertyName);
+
+ if (replacement == null)
+ {
+ throw new PropertyException("Property ${" + propertyName +
+ "} has not been set");
+ }
+ fragment = replacement;
+ }
+ sb.append(fragment);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Default parsing method. Parses the supplied value for properties which are specified
+ * using ${foo} syntax. $X is left as is, and $$ specifies a single $.
+ * @param value the property string to parse
+ * @param fragments is populated with the string fragments. A null means "insert a
+ * property value here. The number of nulls in the list when populated is equal to the
+ * size of the propertyRefs list
+ * @param propertyRefs populated with the property names to be added into the final
+ * String.
+ */
+ private static void parsePropertyString(String value, ArrayList<String> fragments,
+ ArrayList<String> propertyRefs)
+ throws PropertyException
+ {
+ int prev = 0;
+ int pos;
+ //search for the next instance of $ from the 'prev' position
+ while ((pos = value.indexOf("$", prev)) >= 0)
+ {
+
+ //if there was any text before this, add it as a fragment
+ if (pos > 0)
+ {
+ fragments.add(value.substring(prev, pos));
+ }
+ //if we are at the end of the string, we tack on a $
+ //then move past it
+ if (pos == (value.length() - 1))
+ {
+ fragments.add("$");
+ prev = pos + 1;
+ }
+ else if (value.charAt(pos + 1) != '{')
+ {
+ //peek ahead to see if the next char is a property or not
+ //not a property: insert the char as a literal
+ if (value.charAt(pos + 1) == '$')
+ {
+ // two $ map to one $
+ fragments.add("$");
+ prev = pos + 2;
+ }
+ else
+ {
+ // $X maps to $X for all values of X!='$'
+ fragments.add(value.substring(pos, pos + 2));
+ prev = pos + 2;
+ }
+ }
+ else
+ {
+ // property found, extract its name or bail on a typo
+ int endName = value.indexOf('}', pos);
+ if (endName < 0)
+ {
+ throw new PropertyException("Syntax error in property: " +
+ value);
+ }
+ String propertyName = value.substring(pos + 2, endName);
+ fragments.add(null);
+ propertyRefs.add(propertyName);
+ prev = endName + 1;
+ }
+ }
+ //no more $ signs found
+ //if there is any tail to the file, append it
+ if (prev < value.length())
+ {
+ fragments.add(value.substring(prev));
+ }
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
new file mode 100644
index 0000000000..84a5836ff7
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.exchange;
+
+public class ExchangeDefaults
+{
+ public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+
+ public final static String TOPIC_EXCHANGE_CLASS = "topic";
+
+ public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+
+ public final static String DIRECT_EXCHANGE_CLASS = "direct";
+
+ public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+
+ public final static String HEADERS_EXCHANGE_CLASS = "headers";
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
new file mode 100644
index 0000000000..fad0450960
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQBody
+{
+ protected abstract byte getType();
+
+ /**
+ * Get the size of the body
+ * @return unsigned short
+ */
+ protected abstract int getSize();
+
+ protected abstract void writePayload(ByteBuffer buffer);
+
+ protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
new file mode 100644
index 0000000000..797df391c3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A data block represents something that has a size in bytes and the ability to write itself to a byte
+ * buffer (similar to a byte array).
+ */
+public abstract class AMQDataBlock implements EncodableAMQDataBlock
+{
+ /**
+ * Get the size of buffer needed to store the byte representation of this
+ * frame.
+ * @return unsigned integer
+ */
+ public abstract long getSize();
+
+ /**
+ * Writes the datablock to the specified buffer.
+ * @param buffer
+ */
+ public abstract void writePayload(ByteBuffer buffer);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
new file mode 100644
index 0000000000..3379cc18e9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AMQDataBlockDecoder
+{
+ Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+ private final Map _supportedBodies = new HashMap();
+
+ public AMQDataBlockDecoder()
+ {
+ _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory());
+ }
+
+ public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ {
+ // type, channel, body size and end byte
+ if (in.remaining() < (1 + 2 + 4 + 1))
+ {
+ return false;
+ }
+
+ final byte type = in.get();
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if (type <= 0 || channel < 0 || bodySize < 0)
+ {
+ throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize);
+ }
+
+ if (in.remaining() < (bodySize + 1))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isSupportedFrameType(byte frameType)
+ {
+ final boolean result = _supportedBodies.containsKey(new Byte(frameType));
+
+ if (!result)
+ {
+ _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+ }
+
+ return result;
+ }
+
+ protected Object createAndPopulateFrame(ByteBuffer in)
+ throws AMQFrameDecodingException
+ {
+ final byte type = in.get();
+ if (!isSupportedFrameType(type))
+ {
+ throw new AMQFrameDecodingException("Unsupported frame type: " + type);
+ }
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type));
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException("Unsupported body type: " + type);
+ }
+ AMQFrame frame = new AMQFrame();
+
+ frame.populateFromBuffer(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type);
+ }
+ return frame;
+ }
+
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ out.write(createAndPopulateFrame(in));
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
new file mode 100644
index 0000000000..9c0ca26dcf
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AMQDataBlockEncoder implements MessageEncoder
+{
+ Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+
+ private Set _messageTypes;
+
+ public AMQDataBlockEncoder()
+ {
+ _messageTypes = new HashSet();
+ _messageTypes.add(EncodableAMQDataBlock.class);
+ }
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ final AMQDataBlock frame = (AMQDataBlock) message;
+ int frameSize = (int)frame.getSize();
+ final ByteBuffer buffer = ByteBuffer.allocate(frameSize);
+ //buffer.setAutoExpand(true);
+ frame.writePayload(buffer);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
+ }
+
+ buffer.flip();
+ out.write(buffer);
+ }
+
+ public Set getMessageTypes()
+ {
+ return _messageTypes;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
new file mode 100644
index 0000000000..17f635c06a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ public int channel;
+
+ public AMQBody bodyFrame;
+
+ public AMQFrame()
+ {
+ }
+
+ public AMQFrame(int channel, AMQBody bodyFrame)
+ {
+ this.channel = channel;
+ this.bodyFrame = bodyFrame;
+ }
+
+ public long getSize()
+ {
+ return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ buffer.put(bodyFrame.getType());
+ // TODO: how does channel get populated
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
+ bodyFrame.writePayload(buffer);
+ buffer.put((byte) 0xCE);
+ }
+
+ /**
+ *
+ * @param buffer
+ * @param channel unsigned short
+ * @param bodySize unsigned integer
+ * @param bodyFactory
+ * @throws AMQFrameDecodingException
+ */
+ public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
+ throws AMQFrameDecodingException
+ {
+ this.channel = channel;
+ bodyFrame = bodyFactory.createBody(buffer);
+ bodyFrame.populateFromBuffer(buffer, bodySize);
+ }
+
+ public String toString()
+ {
+ return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
new file mode 100644
index 0000000000..4e8a8c62b1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+public class AMQFrameDecodingException extends AMQException
+{
+ public AMQFrameDecodingException(String message)
+ {
+ super(message);
+ }
+
+ public AMQFrameDecodingException(String message, Throwable t)
+ {
+ super(message, t);
+ }
+
+ public AMQFrameDecodingException(Logger log, String message)
+ {
+ super(log, message);
+ }
+
+ public AMQFrameDecodingException(Logger log, String message, Throwable t)
+ {
+ super(log, message, t);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
new file mode 100644
index 0000000000..841295e538
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+
+public abstract class AMQMethodBody extends AMQBody
+{
+ public static final byte TYPE = 1;
+
+ /** unsigned short */
+ protected abstract int getBodySize();
+
+ /**
+ * @return unsigned short
+ */
+ protected abstract int getClazz();
+
+ /**
+ * @return unsigned short
+ */
+ protected abstract int getMethod();
+
+ protected abstract void writeMethodPayload(ByteBuffer buffer);
+
+ protected byte getType()
+ {
+ return TYPE;
+ }
+
+ protected int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+ protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ populateMethodBodyFromBuffer(buffer);
+ }
+
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getClass().toString());
+ buf.append(" Class: ").append(getClazz());
+ buf.append(" Method: ").append(getMethod());
+ return buf.toString();
+ }
+
+ /**
+ * Creates an AMQChannelException for the corresponding body type (a channel exception
+ * should include the class and method ids of the body it resulted from).
+ */
+ public AMQChannelException getChannelException(int code, String message)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod());
+ }
+
+ public AMQChannelException getChannelException(int code, String message, Throwable cause)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), cause);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
new file mode 100644
index 0000000000..97f594061e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQMethodBodyFactory implements BodyFactory
+{
+ private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
+
+ public static AMQMethodBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private AMQMethodBodyFactory()
+ {
+ _log.debug("Creating method body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ {
+ return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort());
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
new file mode 100644
index 0000000000..8a8175bbc8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolClassException extends AMQProtocolHeaderException
+{
+ public AMQProtocolClassException(String message)
+ {
+ super(message);
+ }
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
new file mode 100644
index 0000000000..6809c3d21e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+
+public class AMQProtocolHeaderException extends AMQException
+{
+ public AMQProtocolHeaderException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
new file mode 100644
index 0000000000..7f5b26010d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolInstanceException extends AMQProtocolHeaderException
+{
+ public AMQProtocolInstanceException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
new file mode 100644
index 0000000000..4f5677b41a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * Exception that is thrown when the client and server differ on expected protocol version (header) information.
+ *
+ */
+public class AMQProtocolVersionException extends AMQProtocolHeaderException
+{
+ public AMQProtocolVersionException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
new file mode 100644
index 0000000000..859fdac489
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -0,0 +1,589 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicContentHeaderProperties implements ContentHeaderProperties
+{
+ private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class);
+
+ /**
+ * We store the encoded form when we decode the content header so that if we need to
+ * write it out without modifying it we can do so without incurring the expense of
+ * reencoding it
+ */
+ private byte[] _encodedForm;
+
+ /**
+ * Flag indicating whether the entire content header has been decoded yet
+ */
+ private boolean _decoded = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker
+ * for routing in some cases so we can decode that separately.
+ */
+ private boolean _decodedHeaders = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The content type is used by all
+ * clients to determine the message type
+ */
+ private boolean _decodedContentType = true;
+
+ private String _contentType;
+
+ private String _encoding;
+
+ private FieldTable _headers;
+
+ private byte _deliveryMode;
+
+ private byte _priority;
+
+ private String _correlationId;
+
+ private String _replyTo;
+
+ private long _expiration;
+
+ private String _messageId;
+
+ private long _timestamp;
+
+ private String _type;
+
+ private String _userId;
+
+ private String _appId;
+
+ private String _clusterId;
+
+ private int _propertyFlags = 0;
+
+ public BasicContentHeaderProperties()
+ {
+ }
+
+ public int getPropertyListSize()
+ {
+ if (_encodedForm != null)
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
+ int size = 0;
+
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_contentType);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_encoding);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ size += EncodingUtils.encodedFieldTableLength(_headers);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ size += 1;
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ size += 1;
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_correlationId);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_replyTo);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_messageId);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ size += 8;
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_type);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_userId);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_appId);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_clusterId);
+ }
+ return size;
+ }
+ }
+
+ private void clearEncodedForm()
+ {
+ if (!_decoded && _encodedForm != null)
+ {
+ //decode();
+ }
+ _encodedForm = null;
+ }
+
+ public void setPropertyFlags(int propertyFlags)
+ {
+ clearEncodedForm();
+ _propertyFlags = propertyFlags;
+ }
+
+ public int getPropertyFlags()
+ {
+ return _propertyFlags;
+ }
+
+ public void writePropertyListPayload(ByteBuffer buffer)
+ {
+ if (_encodedForm != null)
+ {
+ buffer.put(_encodedForm);
+ }
+ else
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ buffer.put(_deliveryMode);
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ buffer.put(_priority);
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
+ }
+ }
+
+ public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
+ throws AMQFrameDecodingException
+ {
+ _propertyFlags = propertyFlags;
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Property flags: " + _propertyFlags);
+ }
+ decode(buffer);
+ /*_encodedForm = new byte[size];
+ buffer.get(_encodedForm, 0, size);
+ _decoded = false;
+ _decodedHeaders = false;
+ _decodedContentType = false;*/
+ }
+
+ private void decode(ByteBuffer buffer)
+ {
+ //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int pos = buffer.position();
+ try
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ _contentType = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ _encoding = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ _deliveryMode = buffer.get();
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ _priority = buffer.get();
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ _correlationId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ _replyTo = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ _expiration = Long.parseLong(EncodingUtils.readShortString(buffer));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ _messageId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ _timestamp = EncodingUtils.readTimestamp(buffer);
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ _type = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ _userId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ _appId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ _clusterId = EncodingUtils.readShortString(buffer);
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e);
+ }
+
+ final int endPos = buffer.position();
+ buffer.position(pos);
+ final int len = endPos - pos;
+ _encodedForm = new byte[len];
+ final int limit = buffer.limit();
+ buffer.limit(endPos);
+ buffer.get(_encodedForm, 0, len);
+ buffer.limit(limit);
+ buffer.position(endPos);
+ _decoded = true;
+ }
+
+
+ private void decodeUpToHeaders()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ try
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+ }
+ _decodedHeaders = true;
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e);
+ }
+ }
+
+ private void decodeUpToContentType()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ _contentType = EncodingUtils.readShortString(buffer);
+ }
+
+ _decodedContentType = true;
+ }
+
+ private void decodeIfNecessary()
+ {
+ if (!_decoded)
+ {
+ //decode();
+ }
+ }
+
+ private void decodeHeadersIfNecessary()
+ {
+ if (!_decoded && !_decodedHeaders)
+ {
+ decodeUpToHeaders();
+ }
+ }
+
+ private void decodeContentTypeIfNecessary()
+ {
+ if (!_decoded && !_decodedContentType)
+ {
+ decodeUpToContentType();
+ }
+ }
+ public String getContentType()
+ {
+ decodeContentTypeIfNecessary();
+ return _contentType;
+ }
+
+ public void setContentType(String contentType)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 15);
+ _contentType = contentType;
+ }
+
+ public String getEncoding()
+ {
+ decodeIfNecessary();
+ return _encoding;
+ }
+
+ public void setEncoding(String encoding)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 14);
+ _encoding = encoding;
+ }
+
+ public FieldTable getHeaders()
+ {
+ decodeHeadersIfNecessary();
+ return _headers;
+ }
+
+ public void setHeaders(FieldTable headers)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 13);
+ _headers = headers;
+ }
+
+ public byte getDeliveryMode()
+ {
+ decodeIfNecessary();
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(byte deliveryMode)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 12);
+ _deliveryMode = deliveryMode;
+ }
+
+ public byte getPriority()
+ {
+ decodeIfNecessary();
+ return _priority;
+ }
+
+ public void setPriority(byte priority)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 11);
+ _priority = priority;
+ }
+
+ public String getCorrelationId()
+ {
+ decodeIfNecessary();
+ return _correlationId;
+ }
+
+ public void setCorrelationId(String correlationId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 10);
+ _correlationId = correlationId;
+ }
+
+ public String getReplyTo()
+ {
+ decodeIfNecessary();
+ return _replyTo;
+ }
+
+ public void setReplyTo(String replyTo)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 9);
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ decodeIfNecessary();
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 8);
+ _expiration = expiration;
+ }
+
+
+ public String getMessageId()
+ {
+ decodeIfNecessary();
+ return _messageId;
+ }
+
+ public void setMessageId(String messageId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 7);
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ decodeIfNecessary();
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 6);
+ _timestamp = timestamp;
+ }
+
+ public String getType()
+ {
+ decodeIfNecessary();
+ return _type;
+ }
+
+ public void setType(String type)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 5);
+ _type = type;
+ }
+
+ public String getUserId()
+ {
+ decodeIfNecessary();
+ return _userId;
+ }
+
+ public void setUserId(String userId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 4);
+ _userId = userId;
+ }
+
+ public String getAppId()
+ {
+ decodeIfNecessary();
+ return _appId;
+ }
+
+ public void setAppId(String appId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 3);
+ _appId = appId;
+ }
+
+ public String getClusterId()
+ {
+ decodeIfNecessary();
+ return _clusterId;
+ }
+
+ public void setClusterId(String clusterId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 2);
+ _clusterId = clusterId;
+ }
+
+ public String toString()
+ {
+ return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
new file mode 100644
index 0000000000..cd40558de8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
+ */
+public interface BodyFactory
+{
+ AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException;
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
new file mode 100644
index 0000000000..35e29aa064
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock[] _blocks;
+
+ public CompositeAMQDataBlock(AMQDataBlock[] blocks)
+ {
+ _blocks = blocks;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param blocks some blocks to be encoded.
+ */
+ public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks)
+ {
+ this(blocks);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock[] getBlocks()
+ {
+ return _blocks;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = 0;
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ frameSize += _blocks[i].getSize();
+ }
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ _blocks[i].writePayload(buffer);
+ }
+ }
+
+ public String toString()
+ {
+ if (_blocks == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+ for (int i = 0 ; i < _blocks.length; i++)
+ {
+ buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
new file mode 100644
index 0000000000..d7b668534c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentBody extends AMQBody
+{
+ public static final byte TYPE = 3;
+
+ public ByteBuffer payload;
+
+ protected byte getType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return (payload == null ? 0 : payload.limit());
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (payload != null)
+ {
+ ByteBuffer copy = payload.duplicate();
+ buffer.put(copy.rewind());
+ }
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if (size > 0)
+ {
+ payload = buffer.slice();
+ payload.limit((int) size);
+ buffer.skip((int) size);
+ }
+
+ }
+
+ public void reduceBufferToFit()
+ {
+ if (payload != null && (payload.remaining() < payload.capacity() / 2))
+ {
+ int size = payload.limit();
+ ByteBuffer newPayload = ByteBuffer.allocate(size);
+
+ newPayload.put(payload);
+ newPayload.position(0);
+ newPayload.limit(size);
+
+ //reduce reference count on payload
+ payload.release();
+
+ payload = newPayload;
+ }
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ContentBody body)
+ {
+ final AMQFrame frame = new AMQFrame();
+ frame.channel = channelId;
+ frame.bodyFrame = body;
+ return frame;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
new file mode 100644
index 0000000000..1d6b72ce76
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentBodyFactory implements BodyFactory
+{
+ private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private static final ContentBodyFactory _instance = new ContentBodyFactory();
+
+ public static ContentBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentBodyFactory()
+ {
+ _log.debug("Creating content body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ {
+ return new ContentBody();
+ }
+}
+
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
new file mode 100644
index 0000000000..35ce107831
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentHeaderBody extends AMQBody
+{
+ public static final byte TYPE = 2;
+
+ public int classId;
+
+ public int weight;
+
+ /** unsigned long but java can't handle that anyway when allocating byte array */
+ public long bodySize;
+
+ /** must never be null */
+ public ContentHeaderProperties properties;
+
+ public ContentHeaderBody()
+ {
+ }
+
+ public ContentHeaderBody(ContentHeaderProperties props, int classId)
+ {
+ properties = props;
+ this.classId = classId;
+ }
+
+ public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+ {
+ this(props, classId);
+ this.weight = weight;
+ this.bodySize = bodySize;
+ }
+
+ protected byte getType()
+ {
+ return TYPE;
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ classId = buffer.getUnsignedShort();
+ weight = buffer.getUnsignedShort();
+ bodySize = buffer.getLong();
+ int propertyFlags = buffer.getUnsignedShort();
+ ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
+ properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
+ }
+
+ /**
+ * Helper method that is used currently by the persistence layer (by BDB at the moment).
+ * @param buffer
+ * @param size
+ * @return
+ * @throws AMQFrameDecodingException
+ */
+ public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ ContentHeaderBody body = new ContentHeaderBody();
+ body.populateFromBuffer(buffer, size);
+ return body;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, classId);
+ EncodingUtils.writeUnsignedShort(buffer, weight);
+ buffer.putLong(bodySize);
+ EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
+ properties.writePropertyListPayload(buffer);
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
+ long bodySize)
+ {
+ final AMQFrame frame = new AMQFrame();
+ frame.channel = channelId;
+ frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize);
+ return frame;
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
+ {
+ final AMQFrame frame = new AMQFrame();
+ frame.channel = channelId;
+ frame.bodyFrame = body;
+ return frame;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
new file mode 100644
index 0000000000..236c5094fc
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentHeaderBodyFactory implements BodyFactory
+{
+ private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory();
+
+ public static ContentHeaderBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentHeaderBodyFactory()
+ {
+ _log.debug("Creating content header body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ {
+ // all content headers are the same - it is only the properties that differ.
+ // the content header body further delegates construction of properties
+ return new ContentHeaderBody();
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
new file mode 100644
index 0000000000..65b629bf17
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * There will be an implementation of this interface for each content type. All content types have associated
+ * header properties and this provides a way to encode and decode them.
+ */
+public interface ContentHeaderProperties
+{
+ /**
+ * Writes the property list to the buffer, in a suitably encoded form.
+ * @param buffer The buffer to write to
+ */
+ void writePropertyListPayload(ByteBuffer buffer);
+
+ /**
+ * Populates the properties from buffer.
+ * @param buffer The buffer to read from.
+ * @param propertyFlags he property flags.
+ * @throws AMQFrameDecodingException when the buffer does not contain valid data
+ */
+ void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
+ throws AMQFrameDecodingException;
+
+ /**
+ * @return the size of the encoded property list in bytes.
+ */
+ int getPropertyListSize();
+
+ /**
+ * Gets the property flags. Property flags indicate which properties are set in the list. The
+ * position and meaning of each flag is defined in the protocol specification for the particular
+ * content type with which these properties are associated.
+ * @return flags
+ */
+ int getPropertyFlags();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
new file mode 100644
index 0000000000..ea1b0d6ef5
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentHeaderPropertiesFactory
+{
+ private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory();
+
+ public static ContentHeaderPropertiesFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentHeaderPropertiesFactory()
+ {
+ }
+
+ public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+ ByteBuffer buffer, int size)
+ throws AMQFrameDecodingException
+ {
+ ContentHeaderProperties properties;
+ switch (classId)
+ {
+ case BasicConsumeBody.CLASS_ID:
+ properties = new BasicContentHeaderProperties();
+ break;
+ default:
+ throw new AMQFrameDecodingException("Unsupport content header class id: " + classId);
+ }
+ properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
+ return properties;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java
new file mode 100644
index 0000000000..3d493979eb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * Marker interface to indicate to MINA that a data block should be encoded with the
+ * single encoder/decoder that we have defined.
+ *
+ * Note that due to a bug in MINA all classes must directly implement this interface, even if
+ * a superclass implements it.
+ * TODO: fix MINA so that this is not necessary
+ *
+ */
+public interface EncodableAMQDataBlock
+{
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
new file mode 100644
index 0000000000..65deb61dfb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -0,0 +1,546 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import java.nio.charset.Charset;
+
+public class EncodingUtils
+{
+ private static final Logger _logger = Logger.getLogger(EncodingUtils.class);
+
+ private static final String STRING_ENCODING = "iso8859-15";
+
+ private static final Charset _charset = Charset.forName("iso8859-15");
+
+ public static final int SIZEOF_UNSIGNED_SHORT = 2;
+ public static final int SIZEOF_UNSIGNED_INT = 4;
+
+ public static int encodedShortStringLength(String s)
+ {
+ if (s == null)
+ {
+ return 1;
+ }
+ else
+ {
+ return (short) (1 + s.length());
+ }
+ }
+
+ public static int encodedLongStringLength(String s)
+ {
+ if (s == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + s.length();
+ }
+ }
+
+ public static int encodedLongStringLength(char[] s)
+ {
+ if (s == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + s.length;
+ }
+ }
+
+ public static int encodedLongstrLength(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + bytes.length;
+ }
+ }
+
+ public static int encodedFieldTableLength(FieldTable table)
+ {
+ if (table == null)
+ {
+ // size is encoded as 4 octets
+ return 4;
+ }
+ else
+ {
+ // size of the table plus 4 octets for the size
+ return (int) table.getEncodedSize() + 4;
+ }
+ }
+
+ public static void writeShortStringBytes(ByteBuffer buffer, String s)
+ {
+ if (s != null)
+ {
+ byte[] encodedString = new byte[s.length()];
+ char[] cha = s.toCharArray();
+ for (int i = 0; i < cha.length; i++)
+ {
+ encodedString[i] = (byte) cha[i];
+ }
+ // TODO: check length fits in an unsigned byte
+ writeUnsignedByte(buffer, (short) encodedString.length);
+ buffer.put(encodedString);
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, String s)
+ {
+ assert s == null || s.length() <= 0xFFFE;
+ if (s != null)
+ {
+ int len = s.length();
+ writeUnsignedInteger(buffer, s.length());
+ byte[] encodedString = new byte[len];
+ char[] cha = s.toCharArray();
+ for (int i = 0; i < cha.length; i++)
+ {
+ encodedString[i] = (byte) cha[i];
+ }
+ buffer.put(encodedString);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, char[] s)
+ {
+ assert s == null || s.length <= 0xFFFE;
+ if (s != null)
+ {
+ int len = s.length;
+ writeUnsignedInteger(buffer, s.length);
+ byte[] encodedString = new byte[len];
+ for (int i = 0; i < s.length; i++)
+ {
+ encodedString[i] = (byte) s[i];
+ }
+ buffer.put(encodedString);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes)
+ {
+ assert bytes == null || bytes.length <= 0xFFFE;
+ if (bytes != null)
+ {
+ writeUnsignedInteger(buffer, bytes.length);
+ buffer.put(bytes);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeUnsignedByte(ByteBuffer buffer, short b)
+ {
+ byte bv = (byte) b;
+ buffer.put(bv);
+ }
+
+ public static void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (s < Short.MAX_VALUE)
+ {
+ buffer.putShort((short) s);
+ }
+ else
+ {
+ short sv = (short) s;
+ buffer.put((byte) (0xFF & (sv >> 8)));
+ buffer.put((byte) (0xFF & sv));
+ }
+ }
+
+ public static void writeUnsignedInteger(ByteBuffer buffer, long l)
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (l < Integer.MAX_VALUE)
+ {
+ buffer.putInt((int) l);
+ }
+ else
+ {
+ int iv = (int) l;
+
+ // FIXME: This *may* go faster if we build this into a local 4-byte array and then
+ // put the array in a single call.
+ buffer.put((byte) (0xFF & (iv >> 24)));
+ buffer.put((byte) (0xFF & (iv >> 16)));
+ buffer.put((byte) (0xFF & (iv >> 8)));
+ buffer.put((byte) (0xFF & iv));
+ }
+ }
+
+ public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
+ {
+ if (table != null)
+ {
+ table.writeToBuffer(buffer);
+ }
+ else
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean[] values)
+ {
+ byte packedValue = 0;
+ for (int i = 0; i < values.length; i++)
+ {
+ if (values[i])
+ {
+ packedValue = (byte) (packedValue | (1 << i));
+ }
+ }
+
+ buffer.put(packedValue);
+ }
+
+ /**
+ * This is used for writing longstrs.
+ * @param buffer
+ * @param data
+ */
+ public static void writeLongstr(ByteBuffer buffer, byte[] data)
+ {
+ if (data != null)
+ {
+ writeUnsignedInteger(buffer, data.length);
+ buffer.put(data);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeTimestamp(ByteBuffer buffer, long timestamp)
+ {
+ writeUnsignedInteger(buffer, 0/*timestamp msb*/);
+ writeUnsignedInteger(buffer, timestamp);
+ }
+
+ public static boolean[] readBooleans(ByteBuffer buffer)
+ {
+ byte packedValue = buffer.get();
+ boolean[] result = new boolean[8];
+
+ for (int i = 0; i < 8; i++)
+ {
+ result[i] = ((packedValue & (1 << i)) != 0);
+ }
+ return result;
+ }
+
+ public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return new FieldTable(buffer, length);
+ }
+ }
+
+ public static String readShortString(ByteBuffer buffer)
+ {
+ short length = buffer.getUnsigned();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ // this may seem rather odd to declare two array but testing has shown
+ // that constructing a string from a byte array is 5 (five) times slower
+ // than constructing one from a char array.
+ // this approach here is valid since we know that all the chars are
+ // ASCII (0-127)
+ byte[] stringBytes = new byte[length];
+ buffer.get(stringBytes, 0, length);
+ char[] stringChars = new char[length];
+ for (int i = 0; i < stringChars.length; i++)
+ {
+ stringChars[i] = (char) stringBytes[i];
+ }
+
+ return new String(stringChars);
+ }
+ }
+
+ public static String readLongString(ByteBuffer buffer)
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ // this may seem rather odd to declare two array but testing has shown
+ // that constructing a string from a byte array is 5 (five) times slower
+ // than constructing one from a char array.
+ // this approach here is valid since we know that all the chars are
+ // ASCII (0-127)
+ byte[] stringBytes = new byte[(int)length];
+ buffer.get(stringBytes, 0, (int)length);
+ char[] stringChars = new char[(int)length];
+ for (int i = 0; i < stringChars.length; i++)
+ {
+ stringChars[i] = (char) stringBytes[i];
+ }
+ return new String(stringChars);
+ }
+ }
+
+ public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] result = new byte[(int)length];
+ buffer.get(result);
+ return result;
+ }
+ }
+
+ public static long readTimestamp(ByteBuffer buffer)
+ {
+ // Discard msb from AMQ timestamp
+ buffer.getUnsignedInt();
+ return buffer.getUnsignedInt();
+ }
+
+ // Will barf with a NPE on a null input. Not sure whether it should return null or
+ // an empty field-table (which would be slower - perhaps unnecessarily).
+ //
+ // Some sample input and the result output:
+ //
+ // Input: "a=1" "a=2 c=3" "a=3 c=4 d" "a='four' b='five'" "a=bad"
+ //
+ // Parsing <a=1>...
+ // {a=1}
+ // Parsing <a=2 c=3>...
+ // {a=2, c=3}
+ // Parsing <a=3 c=4 d>...
+ // {a=3, c=4, d=null}
+ // Parsing <a='four' b='five'>...
+ // {a=four, b=five}
+ // Parsing <a=bad>...
+ // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>.
+ //
+ public static FieldTable createFieldTableFromMessageSelector(String selector)
+ {
+ boolean debug = _logger.isDebugEnabled();
+
+ // TODO: Doesn't support embedded quotes properly.
+ String[] expressions = selector.split(" +");
+
+ FieldTable result = new FieldTable();
+
+ for (int i = 0; i < expressions.length; i++)
+ {
+ String expr = expressions[i];
+
+ if (debug)
+ {
+ _logger.debug("Expression = <" + expr + ">");
+ }
+
+ int equals = expr.indexOf('=');
+
+ if (equals < 0)
+ {
+ // Existence check
+ result.put("S" + expr.trim(), null);
+ }
+ else
+ {
+ String key = expr.substring(0, equals).trim();
+ String value = expr.substring(equals + 1).trim();
+
+ if (debug)
+ {
+ _logger.debug("Key = <" + key + ">, Value = <" + value + ">");
+ }
+
+ if (value.charAt(0) == '\'')
+ {
+ if (value.charAt(value.length() - 1) != '\'')
+ {
+ throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">.");
+ }
+ else
+ {
+ value = value.substring(1, value.length() - 1);
+
+ result.put("S" + key, value);
+ }
+ }
+ else
+ {
+ try
+ {
+ int intValue = Integer.parseInt(value);
+
+ result.put("i" + key, value);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">.");
+
+ }
+ }
+ }
+ }
+
+ if (debug)
+ {
+ _logger.debug("Field-table created from <" + selector + "> is <" + result + ">");
+ }
+
+ return (result);
+
+ }
+
+ static byte[] hexToByteArray(String id)
+ {
+ // Should check param for null, long enough for this check, upper-case and trailing char
+ String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x
+
+ int len = s.length();
+ int byte_len = len / 2;
+ byte[] b = new byte[byte_len];
+
+ for (int i = 0; i < byte_len; i++)
+ {
+ // fixme: refine these repetitive subscript calcs.
+ int ch = i * 2;
+
+ byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16);
+ byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16);
+
+ b[i] = (byte) (b1 * 16 + b2);
+ }
+
+ return (b);
+ }
+
+ public static char[] convertToHexCharArray(byte[] from)
+ {
+ int length = from.length;
+ char[] result_buff = new char[length * 2 + 2];
+
+ result_buff[0] = '0';
+ result_buff[1] = 'x';
+
+ int bite;
+ int dest = 2;
+
+ for (int i = 0; i < length; i++)
+ {
+ bite = from[i];
+
+ if (bite < 0)
+ {
+ bite += 256;
+ }
+
+ result_buff[dest++] = hex_chars[bite >> 4];
+ result_buff[dest++] = hex_chars[bite & 0x0f];
+ }
+
+ return (result_buff);
+ }
+
+ public static String convertToHexString(byte[] from)
+ {
+ return (new String(convertToHexCharArray(from)));
+ }
+
+ public static String convertToHexString(ByteBuffer bb)
+ {
+ int size = bb.limit();
+
+ byte[] from = new byte[size];
+
+ // Is this not the same.
+ //bb.get(from, 0, size);
+ for (int i = 0; i < size; i++)
+ {
+ from[i] = bb.get(i);
+ }
+
+ return (new String(convertToHexCharArray(from)));
+ }
+
+ public static void main(String[] args)
+ {
+ for (int i = 0; i < args.length; i++)
+ {
+ String selector = args[i];
+
+ System.err.println("Parsing <" + selector + ">...");
+
+ try
+ {
+ System.err.println(createFieldTableFromMessageSelector(selector));
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.err.println(e);
+ }
+ }
+ }
+
+ private static char hex_chars[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
new file mode 100644
index 0000000000..30f41205dd
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -0,0 +1,319 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.*;
+
+/**
+ * From the protocol document:
+ * field-table = short-integer *field-value-pair
+ * field-value-pair = field-name field-value
+ * field-name = short-string
+ * field-value = 'S' long-string
+ * / 'I' long-integer
+ * / 'D' decimal-value
+ * / 'T' long-integer
+ * decimal-value = decimals long-integer
+ * decimals = OCTET
+ */
+public class FieldTable extends LinkedHashMap
+{
+ private static final Logger _logger = Logger.getLogger(FieldTable.class);
+ private long _encodedSize = 0;
+
+ public FieldTable()
+ {
+ super();
+ }
+
+ /**
+ * Construct a new field table.
+ *
+ * @param buffer the buffer from which to read data. The length byte must be read already
+ * @param length the length of the field table. Must be > 0.
+ * @throws AMQFrameDecodingException if there is an error decoding the table
+ */
+ public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ {
+ super();
+ final boolean debug = _logger.isDebugEnabled();
+ assert length > 0;
+ _encodedSize = length;
+ int sizeRead = 0;
+ while (sizeRead < _encodedSize)
+ {
+ int sizeRemaining = buffer.remaining();
+ final String key = EncodingUtils.readShortString(buffer);
+ // TODO: use proper charset decoder
+ byte iType = buffer.get();
+ final char type = (char) iType;
+ Object value;
+ switch (type)
+ {
+ case 'S':
+ value = EncodingUtils.readLongString(buffer);
+ break;
+ case 'I':
+ value = new Long(buffer.getUnsignedInt());
+ break;
+ default:
+ String msg = "Field '" + key + "' - unsupported field table type: " + type;
+ //some extra debug information...
+ msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+ throw new AMQFrameDecodingException(msg);
+ }
+ sizeRead += (sizeRemaining - buffer.remaining());
+
+ if (debug)
+ {
+ _logger.debug("FieldTable::FieldTable(buffer," + length + "): Read type '" + type + "', key '" + key + "', value '" + value + "' (now read " + sizeRead + " of " + length + " encoded bytes)...");
+ }
+
+ // we deliberately want to call put in the parent class since we do
+ // not need to do the size calculations
+ super.put(key, value);
+ }
+
+ if (debug)
+ {
+ _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
+ }
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug)
+ {
+ _logger.debug("FieldTable::writeToBuffer: Writing encoded size of " + _encodedSize + "...");
+ }
+
+ // write out the total length, which we have kept up to date as data is added
+ EncodingUtils.writeUnsignedInteger(buffer, _encodedSize);
+ final Iterator it = this.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry me = (Map.Entry) it.next();
+ String key = (String) me.getKey();
+
+ EncodingUtils.writeShortStringBytes(buffer, key);
+ Object value = me.getValue();
+
+ if (debug)
+ {
+ _logger.debug("FieldTable::writeToBuffer: Writing key '" + key + "' of type " + value.getClass() + ", value '" + value + "'...");
+ }
+
+ if (value instanceof byte[])
+ {
+ buffer.put((byte) 'S');
+ EncodingUtils.writeLongstr(buffer, (byte[]) value);
+ }
+ else if (value instanceof String)
+ {
+ // TODO: look at using proper charset encoder
+ buffer.put((byte) 'S');
+ EncodingUtils.writeLongStringBytes(buffer, (String) value);
+ }
+ else if (value instanceof Long)
+ {
+ // TODO: look at using proper charset encoder
+ buffer.put((byte) 'I');
+ EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue());
+ }
+ else
+ {
+ // Should never get here
+ throw new IllegalArgumentException("Key '" + key + "': Unsupported type in field table, type: " + ((value == null) ? "null-object" : value.getClass()));
+ }
+ }
+
+ if (debug)
+ {
+ _logger.debug("FieldTable::writeToBuffer: Done.");
+ }
+ }
+
+ public byte[] getDataAsBytes()
+ {
+ final ByteBuffer buffer = ByteBuffer.allocate((int) _encodedSize); // XXX: Is cast a problem?
+ final Iterator it = this.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry me = (Map.Entry) it.next();
+ String key = (String) me.getKey();
+ EncodingUtils.writeShortStringBytes(buffer, key);
+ Object value = me.getValue();
+ if (value instanceof byte[])
+ {
+ buffer.put((byte) 'S');
+ EncodingUtils.writeLongstr(buffer, (byte[]) value);
+ }
+ else if (value instanceof String)
+ {
+ // TODO: look at using proper charset encoder
+ buffer.put((byte) 'S');
+ EncodingUtils.writeLongStringBytes(buffer, (String) value);
+ }
+ else if (value instanceof char[])
+ {
+ // TODO: look at using proper charset encoder
+ buffer.put((byte) 'S');
+ EncodingUtils.writeLongStringBytes(buffer, (char[]) value);
+ }
+ else if (value instanceof Long || value instanceof Integer)
+ {
+ // TODO: look at using proper charset encoder
+ buffer.put((byte) 'I');
+ EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue());
+ }
+ else
+ {
+ // Should never get here
+ assert false;
+ }
+ }
+ final byte[] result = new byte[(int) _encodedSize];
+ buffer.flip();
+ buffer.get(result);
+ buffer.release();
+ return result;
+ }
+
+ public Object put(Object key, Object value)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (key == null)
+ {
+ throw new IllegalArgumentException("All keys must be Strings - was passed: null");
+ }
+ else if (!(key instanceof String))
+ {
+ throw new IllegalArgumentException("All keys must be Strings - was passed: " + key.getClass());
+ }
+
+ Object existing;
+
+ if ((existing = super.remove(key)) != null)
+ {
+ if (debug)
+ {
+ _logger.debug("Found duplicate of key '" + key + "', previous value '" + existing + "' (" + existing.getClass() + "), to be replaced by '" + value + "', (" + value.getClass() + ") - stack trace of source of duplicate follows...", new Throwable().fillInStackTrace());
+ }
+
+ // If we are in effect deleting the value (see comment on null values being deleted
+ // below) then we also need to remove the name from the encoding length.
+ if (value == null)
+ {
+ _encodedSize -= EncodingUtils.encodedShortStringLength((String) key);
+ }
+
+ // FIXME: Should be able to short-cut this process if the old and new values are
+ // the same object and/or type and size...
+ _encodedSize -= getEncodingSize(existing);
+ }
+ else
+ {
+ if (value != null)
+ {
+ _encodedSize += EncodingUtils.encodedShortStringLength((String) key);
+ }
+ }
+
+ // For now: Setting a null value is the equivalent of deleting it.
+ // This is ambiguous in the JMS spec and needs thrashing out and potentially
+ // testing against other implementations.
+ if (value != null)
+ {
+ _encodedSize += getEncodingSize(value);
+ }
+
+ return super.put(key, value);
+ }
+
+ public Object remove(Object key)
+ {
+ if (super.containsKey(key))
+ {
+ final Object value = super.remove(key);
+ _encodedSize -= EncodingUtils.encodedShortStringLength((String) key);
+
+ // This check is, for now, unnecessary (we don't store null values).
+ if (value != null)
+ {
+ _encodedSize -= getEncodingSize(value);
+ }
+
+ return value;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @return unsigned integer
+ */
+ public long getEncodedSize()
+ {
+ return _encodedSize;
+ }
+
+ /**
+ * @return integer
+ */
+ private static int getEncodingSize(Object value)
+ {
+ int encodingSize;
+
+ // the extra byte if for the type indicator that is written out
+ if (value instanceof String)
+ {
+ encodingSize = 1 + EncodingUtils.encodedLongStringLength((String) value);
+ }
+ else if (value instanceof char[])
+ {
+ encodingSize = 1 + EncodingUtils.encodedLongStringLength((char[]) value);
+ }
+ else if (value instanceof Integer)
+ {
+ encodingSize = 1 + 4;
+ }
+ else if (value instanceof Long)
+ {
+ encodingSize = 1 + 4;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported type in field table: " + value.getClass());
+ }
+
+ return encodingSize;
+ }
+
+ public Enumeration keys()
+ {
+ return new FieldTableKeyEnumeration(this);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java
new file mode 100644
index 0000000000..2bc890ebbc
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableKeyEnumeration.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.util.Enumeration;
+import java.util.Iterator;
+
+
+public class FieldTableKeyEnumeration implements Enumeration
+{
+ protected FieldTable _table;
+ protected Iterator _iterator;
+
+ public FieldTableKeyEnumeration(FieldTable ft)
+ {
+ _table = ft;
+ _iterator = ft.keySet().iterator();
+ }
+
+ public boolean hasMoreElements()
+ {
+ return _iterator.hasNext();
+ }
+
+ public Object nextElement()
+ {
+ return _iterator.next();
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
new file mode 100644
index 0000000000..4dda794427
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class HeartbeatBody extends AMQBody
+{
+ public static final byte TYPE = 8;
+ public static AMQFrame FRAME = new HeartbeatBody().toFrame();
+
+ protected byte getType()
+ {
+ return TYPE;
+ }
+
+ protected int getSize()
+ {
+ return 0;//heartbeats we generate have no payload
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
+ public AMQFrame toFrame()
+ {
+ return new AMQFrame(0, this);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
new file mode 100644
index 0000000000..1d63f3827b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class HeartbeatBodyFactory implements BodyFactory
+{
+ public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ {
+ return new HeartbeatBody();
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
new file mode 100644
index 0000000000..e500a683dc
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.AMQException;
+
+public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ public char[] header = new char[]{'A','M','Q','P'};
+ // TODO: generate these constants automatically from the xml protocol spec file
+
+ private static byte CURRENT_PROTOCOL_CLASS = 1;
+ private static final int CURRENT_PROTOCOL_INSTANCE = 1;
+
+ public byte protocolClass = CURRENT_PROTOCOL_CLASS;
+ public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE;
+ public byte protocolMajor;
+ public byte protocolMinor;
+
+// public ProtocolInitiation() {}
+
+ public ProtocolInitiation(byte major, byte minor)
+ {
+ protocolMajor = major;
+ protocolMinor = minor;
+ }
+
+ public long getSize()
+ {
+ return 4 + 1 + 1 + 1 + 1;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ for (int i = 0; i < header.length; i++)
+ {
+ buffer.put((byte) header[i]);
+ }
+ buffer.put(protocolClass);
+ buffer.put(protocolInstance);
+ buffer.put(protocolMajor);
+ buffer.put(protocolMinor);
+ }
+
+ public void populateFromBuffer(ByteBuffer buffer) throws AMQException
+ {
+ throw new AMQException("Method not implemented");
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ProtocolInitiation))
+ {
+ return false;
+ }
+
+ ProtocolInitiation pi = (ProtocolInitiation) o;
+ if (pi.header == null)
+ {
+ return false;
+ }
+
+ if (header.length != pi.header.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < header.length; i++)
+ {
+ if (header[i] != pi.header[i])
+ {
+ return false;
+ }
+ }
+
+ return (protocolClass == pi.protocolClass &&
+ protocolInstance == pi.protocolInstance &&
+ protocolMajor == pi.protocolMajor &&
+ protocolMinor == pi.protocolMinor);
+ }
+
+ public static class Decoder //implements MessageDecoder
+ {
+ /**
+ *
+ * @param session
+ * @param in
+ * @return true if we have enough data to decode the PI frame fully, false if more
+ * data is required
+ */
+ public boolean decodable(IoSession session, ByteBuffer in)
+ {
+ return (in.remaining() >= 8);
+ }
+
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ byte[] theHeader = new byte[4];
+ in.get(theHeader);
+ ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0);
+ pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]};
+ String stringHeader = new String(pi.header);
+ if (!"AMQP".equals(stringHeader))
+ {
+ throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader);
+ }
+ pi.protocolClass = in.get();
+ pi.protocolInstance = in.get();
+ pi.protocolMajor = in.get();
+ pi.protocolMinor = in.get();
+ out.write(pi);
+ }
+ }
+
+ public void checkVersion(ProtocolVersionList pvl) throws AMQException
+ {
+ if (protocolClass != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
+ protocolClass);
+ }
+ if (protocolInstance != CURRENT_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " +
+ protocolInstance);
+ }
+ /*
+ if (protocolMajor != CURRENT_PROTOCOL_VERSION_MAJOR)
+ {
+ throw new AMQProtocolVersionException("Protocol major version " + CURRENT_PROTOCOL_VERSION_MAJOR +
+ " was expected; received " + protocolMajor);
+ }
+ if (protocolMinor != CURRENT_PROTOCOL_VERSION_MINOR)
+ {
+ throw new AMQProtocolVersionException("Protocol minor version " + CURRENT_PROTOCOL_VERSION_MINOR +
+ " was expected; received " + protocolMinor);
+ }
+ */
+
+ /* Look through list of available protocol versions */
+ boolean found = false;
+ for (int i=0; i<pvl.pv.length; i++)
+ {
+ if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor &&
+ pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor)
+ {
+ found = true;
+ }
+ }
+ if (!found)
+ {
+ // TODO: add list of available versions in list to msg...
+ throw new AMQProtocolVersionException("Protocol version " +
+ protocolMajor + "." + protocolMinor + " not found in protocol version list.");
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java
new file mode 100644
index 0000000000..6bf86ffc2e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoFilter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IdleStatus;
+
+/**
+ * Represents an operation on IoFilter.
+ */
+enum EventType
+{
+ OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
+}
+
+class Event
+{
+ private static final Logger _log = Logger.getLogger(Event.class);
+
+ private final EventType type;
+ private final IoFilter.NextFilter nextFilter;
+ private final Object data;
+
+ public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
+ {
+ this.type = type;
+ this.nextFilter = nextFilter;
+ this.data = data;
+ if (type == EventType.EXCEPTION)
+ {
+ _log.error("Exception event constructed: " + data, (Exception) data);
+ }
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+
+ public IoFilter.NextFilter getNextFilter()
+ {
+ return nextFilter;
+ }
+
+
+ public EventType getType()
+ {
+ return type;
+ }
+
+ void process(IoSession session)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Processing " + this);
+ }
+ if (type == EventType.RECEIVED)
+ {
+ nextFilter.messageReceived(session, data);
+ //ByteBufferUtil.releaseIfPossible( data );
+ }
+ else if (type == EventType.SENT)
+ {
+ nextFilter.messageSent(session, data);
+ //ByteBufferUtil.releaseIfPossible( data );
+ }
+ else if (type == EventType.EXCEPTION)
+ {
+ nextFilter.exceptionCaught(session, (Throwable) data);
+ }
+ else if (type == EventType.IDLE)
+ {
+ nextFilter.sessionIdle(session, (IdleStatus) data);
+ }
+ else if (type == EventType.OPENED)
+ {
+ nextFilter.sessionOpened(session);
+ }
+ else if (type == EventType.WRITE)
+ {
+ nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+ }
+ else if (type == EventType.CLOSED)
+ {
+ nextFilter.sessionClosed(session);
+ }
+ }
+
+ public String toString()
+ {
+ return "Event: type " + type + ", data: " + data;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
new file mode 100644
index 0000000000..45a115bcd3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import org.apache.mina.common.IoSession;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Holds events for a session that will be processed asynchronously by
+ * the thread pool in PoolingFilter.
+ */
+class Job implements Runnable
+{
+ private final int _maxEvents;
+ private final IoSession _session;
+ private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final AtomicBoolean _active = new AtomicBoolean();
+ private final AtomicInteger _refCount = new AtomicInteger();
+ private final JobCompletionHandler _completionHandler;
+
+ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+ {
+ _session = session;
+ _completionHandler = completionHandler;
+ _maxEvents = maxEvents;
+ }
+
+ void acquire()
+ {
+ _refCount.incrementAndGet();
+ }
+
+ void release()
+ {
+ _refCount.decrementAndGet();
+ }
+
+ boolean isReferenced()
+ {
+ return _refCount.get() > 0;
+ }
+
+ void add(Event evt)
+ {
+ _eventQueue.add(evt);
+ }
+
+ void processAll()
+ {
+ //limit the number of events processed in one run
+ for (int i = 0; i < _maxEvents; i++)
+ {
+ Event e = _eventQueue.poll();
+ if (e == null)
+ {
+ break;
+ }
+ else
+ {
+ e.process(_session);
+ }
+ }
+ }
+
+ boolean isComplete()
+ {
+ return _eventQueue.peek() == null;
+ }
+
+ boolean activate()
+ {
+ return _active.compareAndSet(false, true);
+ }
+
+ void deactivate()
+ {
+ _active.set(false);
+ }
+
+ public void run()
+ {
+ processAll();
+ deactivate();
+ _completionHandler.completed(_session, this);
+ }
+
+
+ static interface JobCompletionHandler
+ {
+ public void completed(IoSession session, Job job);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
new file mode 100644
index 0000000000..ecb0bd5048
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
+{
+ private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
+ public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
+ public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
+
+ private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
+ private final ReferenceCountingExecutorService _poolReference;
+ private final Set<EventType> _asyncTypes;
+
+ private final String _name;
+ private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+ {
+ _poolReference = refCountingPool;
+ _asyncTypes = asyncTypes;
+ _name = name;
+ }
+
+ private void fireEvent(IoSession session, Event event)
+ {
+ if (_asyncTypes.contains(event.getType()))
+ {
+ Job job = getJobForSession(session);
+ job.acquire(); //prevents this job being removed from _jobs
+ job.add(event);
+ if (job.activate())
+ {
+ _poolReference.getPool().execute(job);
+ }
+ }
+ else
+ {
+ event.process(session);
+ }
+ }
+
+ private Job getJobForSession(IoSession session)
+ {
+ Job job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;
+ }
+
+ private Job createJobForSession(IoSession session)
+ {
+ return addJobForSession(session, new Job(session, this, _maxEvents));
+ }
+
+ private Job addJobForSession(IoSession session, Job job)
+ {
+ //atomic so ensures all threads agree on the same job
+ Job existing = _jobs.putIfAbsent(session, job);
+ return existing == null ? job : existing;
+ }
+
+ //Job.JobCompletionHandler
+ public void completed(IoSession session, Job job)
+ {
+ if (job.isComplete())
+ {
+ job.release();
+ if (!job.isReferenced())
+ {
+ _jobs.remove(session);
+ }
+ }
+ else
+ {
+ if (job.activate())
+ {
+ _poolReference.getPool().execute(job);
+ }
+ }
+ }
+
+ //IoFilter methods that are processed by threads on the pool
+
+ public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+ {
+ fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+ }
+
+ public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+ {
+ fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+ }
+
+ public void sessionIdle(NextFilter nextFilter, IoSession session,
+ IdleStatus status) throws Exception
+ {
+ fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+ }
+
+ public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ Throwable cause) throws Exception
+ {
+ fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+ }
+
+ public void messageReceived(NextFilter nextFilter, IoSession session,
+ Object message) throws Exception
+ {
+ //ByteBufferUtil.acquireIfPossible( message );
+ fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+ }
+
+ public void messageSent(NextFilter nextFilter, IoSession session,
+ Object message) throws Exception
+ {
+ //ByteBufferUtil.acquireIfPossible( message );
+ fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+ }
+
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+ }
+
+ //IoFilter methods that are processed on current thread (NOT on pooled thread)
+
+ public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
+ {
+ nextFilter.filterClose(session);
+ }
+
+ public void sessionCreated(NextFilter nextFilter, IoSession session)
+ {
+ nextFilter.sessionCreated(session);
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+
+ // LifeCycle methods
+
+ public void init()
+ {
+ _logger.info("Init called on PoolingFilter " + toString());
+ // called when the filter is initialised in the chain. If the reference count is
+ // zero this acquire will initialise the pool
+ _poolReference.acquireExecutorService();
+ }
+
+ public void destroy()
+ {
+ _logger.info("Destroy called on PoolingFilter " + toString());
+ // when the reference count gets to zero we release the executor service
+ _poolReference.releaseExecutorService();
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
new file mode 100644
index 0000000000..bed9ac0b99
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.filter.ReferenceCountingIoFilter;
+import org.apache.mina.common.ThreadModel;
+
+public class ReadWriteThreadModel implements ThreadModel
+{
+ public void buildFilterChain(IoFilterChain chain) throws Exception
+ {
+ ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
+ "AsynchronousReadFilter");
+ PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
+ "AsynchronousWriteFilter");
+
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
new file mode 100644
index 0000000000..f048a12b90
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * We share the executor service among several PoolingFilters. This class reference counts
+ * how many filter chains are using the executor service and destroys the service, thus
+ * freeing up its threads, when the count reaches zero. It recreates the service when
+ * the count is incremented.
+ *
+ * This is particularly important on the client where failing to destroy the executor
+ * service prevents the JVM from shutting down due to the existence of non-daemon threads.
+ *
+ */
+public class ReferenceCountingExecutorService
+{
+ private static final int MINIMUM_POOL_SIZE = 4;
+ private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+ private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
+
+ /**
+ * We need to be able to check the current reference count and if necessary
+ * create the executor service atomically.
+ */
+ private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
+
+ private final Object _lock = new Object();
+
+ private ExecutorService _pool;
+
+ private int _refCount = 0;
+
+ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+
+ public static ReferenceCountingExecutorService getInstance()
+ {
+ return _instance;
+ }
+
+ private ReferenceCountingExecutorService()
+ {
+ }
+
+ ExecutorService acquireExecutorService()
+ {
+ synchronized (_lock)
+ {
+ if (_refCount++ == 0)
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
+ return _pool;
+ }
+ }
+
+ void releaseExecutorService()
+ {
+ synchronized (_lock)
+ {
+ if (--_refCount == 0)
+ {
+ _pool.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * The filters that use the executor service should call this method to get access
+ * to the service. Note that this method does not alter the reference count.
+ *
+ * @return the underlying executor service
+ */
+ public ExecutorService getPool()
+ {
+ return _pool;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
new file mode 100644
index 0000000000..0716104688
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public final class AMQConstant
+{
+ private int _code;
+
+ private String _name;
+
+ private static Map _codeMap = new HashMap();
+
+ private AMQConstant(int code, String name, boolean map)
+ {
+ _code = code;
+ _name = name;
+ if (map)
+ {
+ _codeMap.put(new Integer(code), this);
+ }
+ }
+
+ public String toString()
+ {
+ return _code + ": " + _name;
+ }
+
+ public int getCode()
+ {
+ return _code;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
+
+ public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true);
+
+ public static final AMQConstant REPLY_SUCCESS = new AMQConstant(200, "reply success", true);
+
+ public static final AMQConstant NOT_DELIVERED = new AMQConstant(310, "not delivered", true);
+
+ public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true);
+
+ public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
+
+ public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
+
+ public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+
+ public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
+
+ public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
+
+ public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
+
+ public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+
+ public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
+
+ public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
+
+ public static final AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
+
+ public static final AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true);
+
+ public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
+
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+
+ public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
+
+ public static final AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true);
+
+ public static AMQConstant getConstant(int code)
+ {
+ AMQConstant c = (AMQConstant) _codeMap.get(new Integer(code));
+ if (c == null)
+ {
+ c = new AMQConstant(code, "unknown code", false);
+ }
+ return c;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java
new file mode 100644
index 0000000000..c066fd0370
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ssl;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+
+/**
+ * Factory to create a bogus SSLContext. This means that it is easy to test SSL but this
+ * cannot be used in a production environment.
+ * <p/>
+ * This is based on the sample that comes with MINA, written by Trustin Lee
+ */
+public class BogusSSLContextFactory
+{
+ /**
+ * Protocol to use.
+ */
+ private static final String PROTOCOL = "TLS";
+
+ /**
+ * Bougus Server certificate keystore file name.
+ */
+ private static final String BOGUS_KEYSTORE = "qpid.cert";
+
+ // NOTE: The keystore was generated using keytool:
+ // keytool -genkey -alias qpid -keysize 512 -validity 3650
+ // -keyalg RSA -dname "CN=amqp.org" -keypass qpidpw
+ // -storepass qpidpw -keystore qpid.cert
+
+ private static final char[] BOGUS_KEYSTORE_PASSWORD = {'q', 'p', 'i', 'd', 'p', 'w'};
+
+ private static SSLContext serverInstance = null;
+
+ private static SSLContext clientInstance = null;
+
+ /**
+ * Get SSLContext singleton.
+ *
+ * @return SSLContext
+ * @throws java.security.GeneralSecurityException
+ */
+ public static SSLContext getInstance(boolean server)
+ throws GeneralSecurityException
+ {
+ SSLContext retInstance;
+ if (server)
+ {
+ // FIXME: looks like double-checking locking
+ if (serverInstance == null)
+ {
+ synchronized (BogusSSLContextFactory.class)
+ {
+ if (serverInstance == null)
+ {
+ try
+ {
+ serverInstance = createBougusServerSSLContext();
+ }
+ catch (Exception ioe)
+ {
+ throw new GeneralSecurityException(
+ "Can't create Server SSLContext:" + ioe);
+ }
+ }
+ }
+ }
+ retInstance = serverInstance;
+ }
+ else
+ {
+ // FIXME: looks like double-checking locking
+ if (clientInstance == null)
+ {
+ synchronized (BogusSSLContextFactory.class)
+ {
+ if (clientInstance == null)
+ {
+ clientInstance = createBougusClientSSLContext();
+ }
+ }
+ }
+ retInstance = clientInstance;
+ }
+ return retInstance;
+ }
+
+ private static SSLContext createBougusServerSSLContext()
+ throws GeneralSecurityException, IOException
+ {
+ // Create keystore
+ KeyStore ks = KeyStore.getInstance("JKS");
+ InputStream in = null;
+ try
+ {
+ in = BogusSSLContextFactory.class.getResourceAsStream(BOGUS_KEYSTORE);
+ if (in == null)
+ {
+ throw new IOException("Unable to load keystore resource: " + BOGUS_KEYSTORE);
+ }
+ ks.load(in, BOGUS_KEYSTORE_PASSWORD);
+ }
+ finally
+ {
+ if (in != null)
+ {
+ //noinspection EmptyCatchBlock
+ try
+ {
+ in.close();
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ }
+
+ // Set up key manager factory to use our key store
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, BOGUS_KEYSTORE_PASSWORD);
+
+ // Initialize the SSLContext to work with our key managers.
+ SSLContext sslContext = SSLContext.getInstance(PROTOCOL);
+ sslContext.init(kmf.getKeyManagers(), BogusTrustManagerFactory.X509_MANAGERS, null);
+
+ return sslContext;
+ }
+
+ private static SSLContext createBougusClientSSLContext()
+ throws GeneralSecurityException
+ {
+ SSLContext context = SSLContext.getInstance(PROTOCOL);
+ context.init(null, BogusTrustManagerFactory.X509_MANAGERS, null);
+ return context;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java
new file mode 100644
index 0000000000..8a71e3d7c8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ssl;
+
+import javax.net.ssl.ManagerFactoryParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactorySpi;
+import javax.net.ssl.X509TrustManager;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+/**
+ * Bogus trust manager factory. Used to make testing SSL simpler - i.e no need to
+ * mess about with keystores.
+ * <p/>
+ * This is based on the example that comes with MINA, written by Trustin Lee.
+ */
+class BogusTrustManagerFactory extends TrustManagerFactorySpi
+{
+
+ static final X509TrustManager X509 = new X509TrustManager()
+ {
+ public void checkClientTrusted(X509Certificate[] x509Certificates,
+ String s) throws CertificateException
+ {
+ }
+
+ public void checkServerTrusted(X509Certificate[] x509Certificates,
+ String s) throws CertificateException
+ {
+ }
+
+ public X509Certificate[] getAcceptedIssuers()
+ {
+ return new X509Certificate[ 0 ];
+ }
+ };
+
+ static final TrustManager[] X509_MANAGERS = new TrustManager[]{X509};
+
+ public BogusTrustManagerFactory()
+ {
+ }
+
+ protected TrustManager[] engineGetTrustManagers()
+ {
+ return X509_MANAGERS;
+ }
+
+ protected void engineInit(KeyStore keystore) throws KeyStoreException
+ {
+ // noop
+ }
+
+ protected void engineInit(
+ ManagerFactoryParameters managerFactoryParameters)
+ throws InvalidAlgorithmParameterException
+ {
+ // noop
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java
new file mode 100644
index 0000000000..f2d5396637
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ssl;
+
+import javax.net.ServerSocketFactory;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.security.GeneralSecurityException;
+
+/**
+ * Simple Server Socket factory to create sockets with or without SSL enabled.
+ * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes)
+ * <p/>
+ * This is based on the example that comes with MINA, written by Trustin Lee.
+ */
+public class SSLServerSocketFactory extends javax.net.ServerSocketFactory
+{
+ private static boolean sslEnabled = false;
+
+ private static javax.net.ServerSocketFactory sslFactory = null;
+
+ private static ServerSocketFactory factory = null;
+
+ public SSLServerSocketFactory()
+ {
+ super();
+ }
+
+ public ServerSocket createServerSocket(int port) throws IOException
+ {
+ return new ServerSocket(port);
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog)
+ throws IOException
+ {
+ return new ServerSocket(port, backlog);
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog,
+ InetAddress ifAddress)
+ throws IOException
+ {
+ return new ServerSocket(port, backlog, ifAddress);
+ }
+
+ public static javax.net.ServerSocketFactory getServerSocketFactory()
+ throws IOException
+ {
+ if (isSslEnabled())
+ {
+ if (sslFactory == null)
+ {
+ try
+ {
+ sslFactory = BogusSSLContextFactory.getInstance(true)
+ .getServerSocketFactory();
+ }
+ catch (GeneralSecurityException e)
+ {
+ IOException ioe = new IOException(
+ "could not create SSL socket");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+ return sslFactory;
+ }
+ else
+ {
+ if (factory == null)
+ {
+ factory = new SSLServerSocketFactory();
+ }
+ return factory;
+ }
+
+ }
+
+ public static boolean isSslEnabled()
+ {
+ return sslEnabled;
+ }
+
+ public static void setSslEnabled(boolean newSslEnabled)
+ {
+ sslEnabled = newSslEnabled;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java
new file mode 100644
index 0000000000..31dccb593e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ssl;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.GeneralSecurityException;
+
+/**
+ * Simple Socket factory to create sockets with or without SSL enabled.
+ * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes).
+ * <p/>
+ * This is based on an example that comes with MINA, written by Trustin Lee.
+ */
+public class SSLSocketFactory extends SocketFactory
+{
+ private static boolean sslEnabled = false;
+
+ private static javax.net.ssl.SSLSocketFactory sslFactory = null;
+
+ private static javax.net.SocketFactory factory = null;
+
+ public SSLSocketFactory()
+ {
+ super();
+ }
+
+ public Socket createSocket(String arg1, int arg2) throws IOException,
+ UnknownHostException
+ {
+ if (isSslEnabled())
+ {
+ return getSSLFactory().createSocket(arg1, arg2);
+ }
+ else
+ {
+ return new Socket(arg1, arg2);
+ }
+ }
+
+ public Socket createSocket(String arg1, int arg2, InetAddress arg3,
+ int arg4) throws IOException,
+ UnknownHostException
+ {
+ if (isSslEnabled())
+ {
+ return getSSLFactory().createSocket(arg1, arg2, arg3, arg4);
+ }
+ else
+ {
+ return new Socket(arg1, arg2, arg3, arg4);
+ }
+ }
+
+ public Socket createSocket(InetAddress arg1, int arg2)
+ throws IOException
+ {
+ if (isSslEnabled())
+ {
+ return getSSLFactory().createSocket(arg1, arg2);
+ }
+ else
+ {
+ return new Socket(arg1, arg2);
+ }
+ }
+
+ public Socket createSocket(InetAddress arg1, int arg2, InetAddress arg3,
+ int arg4) throws IOException
+ {
+ if (isSslEnabled())
+ {
+ return getSSLFactory().createSocket(arg1, arg2, arg3, arg4);
+ }
+ else
+ {
+ return new Socket(arg1, arg2, arg3, arg4);
+ }
+ }
+
+ public static javax.net.SocketFactory getSocketFactory()
+ {
+ if (factory == null)
+ {
+ factory = new SSLSocketFactory();
+ }
+ return factory;
+ }
+
+ private javax.net.ssl.SSLSocketFactory getSSLFactory()
+ {
+ if (sslFactory == null)
+ {
+ try
+ {
+ sslFactory = BogusSSLContextFactory.getInstance(false)
+ .getSocketFactory();
+ }
+ catch (GeneralSecurityException e)
+ {
+ throw new RuntimeException("could not create SSL socket", e);
+ }
+ }
+ return sslFactory;
+ }
+
+ public static boolean isSslEnabled()
+ {
+ return sslEnabled;
+ }
+
+ public static void setSslEnabled(boolean newSslEnabled)
+ {
+ sslEnabled = newSslEnabled;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
new file mode 100644
index 0000000000..5ea1a55f2a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -0,0 +1,260 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.url;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import java.util.HashMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AMQBindingURL implements BindingURL
+{
+ String _url;
+ String _exchangeClass;
+ String _exchangeName;
+ String _destinationName;
+ String _queueName;
+ private HashMap<String, String> _options;
+
+
+ public AMQBindingURL(String url) throws URLSyntaxException
+ {
+ //format:
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+
+ _url = url;
+ _options = new HashMap<String, String>();
+
+ parseBindingURL();
+ }
+
+ private void parseBindingURL() throws URLSyntaxException
+ {
+ try
+ {
+ URI connection = new URI(_url);
+
+ String exchangeClass = connection.getScheme();
+
+ if (exchangeClass == null)
+ {
+ _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" +
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url;
+ //URLHelper.parseError(-1, "Exchange Class not specified.", _url);
+ parseBindingURL();
+ return;
+ }
+ else
+ {
+ setExchangeClass(exchangeClass);
+ }
+
+ String exchangeName = connection.getHost();
+
+ if (exchangeName == null)
+ {
+ URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ }
+ else
+ {
+ setExchangeName(exchangeName);
+ }
+
+ if (connection.getPath() == null ||
+ connection.getPath().equals(""))
+ {
+ URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ "Destination or Queue requried", _url);
+ }
+ else
+ {
+ int slash = connection.getPath().indexOf("/", 1);
+ if (slash == -1)
+ {
+ URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ "Destination requried", _url);
+ }
+ else
+ {
+ String path = connection.getPath();
+ setDestinationName(path.substring(1, slash));
+
+ setQueueName(path.substring(slash + 1));
+
+ }
+ }
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+
+ processOptions();
+
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+
+ }
+ catch (URISyntaxException uris)
+ {
+
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+
+ }
+ }
+
+ private void processOptions()
+ {
+ //this is where we would parse any options that needed more than just storage.
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getExchangeClass()
+ {
+ return _exchangeClass;
+ }
+
+ public void setExchangeClass(String exchangeClass)
+ {
+ _exchangeClass = exchangeClass;
+ }
+
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public void setExchangeName(String name)
+ {
+ _exchangeName = name;
+
+ if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
+ {
+ setOption(BindingURL.OPTION_EXCLUSIVE, "true");
+ }
+ }
+
+ public String getDestinationName()
+ {
+ return _destinationName;
+ }
+
+ public void setDestinationName(String name)
+ {
+ _destinationName = name;
+ }
+
+ public String getQueueName()
+ {
+ if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
+ {
+ if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
+ {
+ return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+ }
+ else
+ {
+ return getDestinationName();
+ }
+ }
+ else
+ {
+ return getDestinationName();
+ }
+ }
+ else
+ {
+ return _queueName;
+ }
+ }
+
+ public void setQueueName(String name)
+ {
+ _queueName = name;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public boolean containsOption(String key)
+ {
+ return _options.containsKey(key);
+ }
+
+ public String getRoutingKey()
+ {
+ if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return getQueueName();
+ }
+
+ if (containsOption(BindingURL.OPTION_ROUTING_KEY))
+ {
+ return getOption(OPTION_ROUTING_KEY);
+ }
+
+ return getDestinationName();
+ }
+
+ public void setRoutingKey(String key)
+ {
+ setOption(OPTION_ROUTING_KEY, key);
+ }
+
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
+ sb.append('/');
+ sb.append(_destinationName);
+ sb.append('/');
+ sb.append(_queueName);
+
+ sb.append(URLHelper.printOptions(_options));
+ return sb.toString();
+ }
+
+ public static void main(String args[]) throws URLSyntaxException
+ {
+ String url = "exchangeClass://exchangeName/Destination/Queue?option='value',option2='value2'";
+
+ AMQBindingURL dest = new AMQBindingURL(url);
+
+ System.out.println(url);
+ System.out.println(dest);
+
+ }
+
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
new file mode 100644
index 0000000000..77802b0e17
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.url;
+
+import java.util.List;
+
+/*
+ Binding URL format:
+ <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+*/
+public interface BindingURL
+{
+ public static final String OPTION_EXCLUSIVE = "exclusive";
+ public static final String OPTION_AUTODELETE = "autodelete";
+ public static final String OPTION_DURABLE = "durable";
+ public static final String OPTION_CLIENTID = "clientid";
+ public static final String OPTION_SUBSCRIPTION = "subscription";
+ public static final String OPTION_ROUTING_KEY = "routingkey";
+
+
+ String getURL();
+
+ String getExchangeClass();
+
+ void setExchangeClass(String exchangeClass);
+
+ String getExchangeName();
+
+ void setExchangeName(String name);
+
+ String getDestinationName();
+
+ void setDestinationName(String name);
+
+ String getQueueName();
+
+ void setQueueName(String name);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+
+ boolean containsOption(String key);
+
+ String getRoutingKey();
+
+ void setRoutingKey(String key);
+
+ String toString();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
new file mode 100644
index 0000000000..959735d438
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.url;
+
+import java.util.HashMap;
+
+public class URLHelper
+{
+ public static char DEFAULT_OPTION_SEPERATOR = '&';
+ public static char ALTERNATIVE_OPTION_SEPARATOR = ',';
+ public static char BROKER_SEPARATOR = ';';
+
+ public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException
+ {
+ //options looks like this
+ //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
+
+ if (options == null || options.indexOf('=') == -1)
+ {
+ return;
+ }
+
+ int optionIndex = options.indexOf('=');
+
+ String option = options.substring(0, optionIndex);
+
+ int length = options.length();
+
+ int nestedQuotes = 0;
+
+ // to store index of final "'"
+ int valueIndex = optionIndex;
+
+ //Walk remainder of url.
+ while (nestedQuotes > 0 || valueIndex < length)
+ {
+ valueIndex++;
+
+ if (valueIndex >= length)
+ {
+ break;
+ }
+
+ if (options.charAt(valueIndex) == '\'')
+ {
+ if (valueIndex + 1 < options.length())
+ {
+ if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR ||
+ options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
+ options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
+ options.charAt(valueIndex + 1) == '\'')
+ {
+ nestedQuotes--;
+// System.out.println(
+// options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1));
+ if (nestedQuotes == 0)
+ {
+ //We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+// System.out.println(
+// options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1));
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options.charAt(valueIndex) == '\'')
+ {
+ nestedQuotes--;
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (nestedQuotes != 0 || valueIndex < (optionIndex + 2))
+ {
+ int sepIndex = 0;
+
+ //Try and identify illegal separator character
+ if (nestedQuotes > 1)
+ {
+ for (int i = 0; i < nestedQuotes; i++)
+ {
+ sepIndex = options.indexOf('\'', sepIndex);
+ sepIndex++;
+ }
+ }
+
+ if (sepIndex >= options.length() || sepIndex == 0)
+ {
+ parseError(valueIndex, "Unterminated option", options);
+ }
+ else
+ {
+ parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
+ options.charAt(sepIndex) + "'", options);
+ }
+ }
+
+ // optionIndex +2 to skip "='"
+ String value = options.substring(optionIndex + 2, valueIndex);
+
+ optionMap.put(option, value);
+
+ if (valueIndex < (options.length() - 1))
+ {
+ //Recurse to get remaining options
+ parseOptions(optionMap, options.substring(valueIndex + 2));
+ }
+ }
+
+
+ public static void parseError(int index, String error, String url) throws URLSyntaxException
+ {
+ parseError(index, 1, error, url);
+ }
+
+ public static void parseError(int index, int length, String error, String url) throws URLSyntaxException
+ {
+ throw new URLSyntaxException(url, error, index, length);
+ }
+
+ public static String printOptions(HashMap<String, String> options)
+ {
+ if (options.isEmpty())
+ {
+ return "";
+ }
+ else
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append('?');
+ for (String key : options.keySet())
+ {
+ sb.append(key);
+
+ sb.append("='");
+
+ sb.append(options.get(key));
+
+ sb.append("'");
+ sb.append(DEFAULT_OPTION_SEPERATOR);
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+
+ return sb.toString();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
new file mode 100644
index 0000000000..b454069826
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.url;
+
+import java.net.URISyntaxException;
+
+public class URLSyntaxException extends URISyntaxException
+{
+ private int _length;
+
+ public URLSyntaxException(String url, String error, int index, int length)
+ {
+ super(url, error, index);
+
+ _length = length;
+ }
+
+ private static String getPositionString(int index, int length)
+ {
+ StringBuffer sb = new StringBuffer(index + 1);
+
+ for (int i = 0; i < index; i++)
+ {
+ sb.append(" ");
+ }
+
+ if (length > -1)
+ {
+ for (int i = 0; i < length; i++)
+ {
+ sb.append('^');
+ }
+ }
+
+ return sb.toString();
+ }
+
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(getReason());
+
+ if (getIndex() > -1)
+ {
+ if (_length != -1)
+ {
+ sb.append(" between indicies ");
+ sb.append(getIndex());
+ sb.append(" and ");
+ sb.append(_length);
+ }
+ else
+ {
+ sb.append(" at index ");
+ sb.append(getIndex());
+ }
+ }
+
+ sb.append(" ");
+ if (getIndex() != -1)
+ {
+ sb.append("\n");
+ }
+
+ sb.append(getInput());
+
+ if (getIndex() != -1)
+ {
+ sb.append("\n");
+ sb.append(getPositionString(getIndex(), _length));
+ }
+
+ return sb.toString();
+ }
+
+
+}