summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java374
1 files changed, 300 insertions, 74 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 57622b5054..c7d89a9927 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +35,27 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
private static final AMQShortString ZERO_STRING = null;
+ /**
+ * We store the encoded form when we decode the content header so that if we need to write it out without modifying
+ * it we can do so without incurring the expense of reencoding it
+ */
+ private byte[] _encodedForm;
+
+ /** Flag indicating whether the entire content header has been decoded yet */
+ private boolean _decoded = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for
+ * routing in some cases so we can decode that separately.
+ */
+ private boolean _decodedHeaders = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients
+ * to determine the message type
+ */
+ private boolean _decodedContentType = true;
+
private AMQShortString _contentType;
private AMQShortString _encoding;
@@ -67,10 +86,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
private int _propertyFlags = 0;
private static final int CONTENT_TYPE_MASK = 1 << 15;
- private static final int ENCODING_MASK = 1 << 14;
+ private static final int ENCONDING_MASK = 1 << 14;
private static final int HEADERS_MASK = 1 << 13;
private static final int DELIVERY_MODE_MASK = 1 << 12;
- private static final int PRIORITY_MASK = 1 << 11;
+ private static final int PROPRITY_MASK = 1 << 11;
private static final int CORRELATION_ID_MASK = 1 << 10;
private static final int REPLY_TO_MASK = 1 << 9;
private static final int EXPIRATION_MASK = 1 << 8;
@@ -82,11 +101,34 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
private static final int CLUSTER_ID_MASK = 1 << 2;
+ /**
+ * This is 0_10 specific. We use this property to check if some message properties have been changed.
+ */
+ private boolean _hasBeenUpdated = false;
+
+ public boolean reset()
+ {
+ boolean result = _hasBeenUpdated;
+ _hasBeenUpdated = false;
+ return result;
+ }
+
+ public void updated()
+ {
+ _hasBeenUpdated = true;
+ }
+
public BasicContentHeaderProperties()
{ }
public int getPropertyListSize()
{
+ if (_encodedForm != null)
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
int size = 0;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
@@ -94,7 +136,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
size += EncodingUtils.encodedShortStringLength(_contentType);
}
- if ((_propertyFlags & ENCODING_MASK) > 0)
+ if ((_propertyFlags & ENCONDING_MASK) > 0)
{
size += EncodingUtils.encodedShortStringLength(_encoding);
}
@@ -109,7 +151,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
size += 1;
}
- if ((_propertyFlags & PRIORITY_MASK) > 0)
+ if ((_propertyFlags & PROPRITY_MASK) > 0)
{
size += 1;
}
@@ -167,10 +209,23 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
}
return size;
+ }
+ }
+
+ private void clearEncodedForm()
+ {
+ if (!_decoded && (_encodedForm != null))
+ {
+ // decode();
+ }
+
+ _encodedForm = null;
}
public void setPropertyFlags(int propertyFlags)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags = propertyFlags;
}
@@ -179,87 +234,94 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
return _propertyFlags;
}
- public void writePropertyListPayload(DataOutputStream buffer) throws IOException
+ public void writePropertyListPayload(ByteBuffer buffer)
{
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ if (_encodedForm != null)
{
- EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ buffer.put(_encodedForm);
}
-
- if ((_propertyFlags & ENCODING_MASK) != 0)
+ else
{
- EncodingUtils.writeShortStringBytes(buffer, _encoding);
- }
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- EncodingUtils.writeFieldTableBytes(buffer, _headers);
- }
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
- if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
- {
- buffer.writeByte(_deliveryMode);
- }
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
- if ((_propertyFlags & PRIORITY_MASK) != 0)
- {
- buffer.writeByte(_priority);
- }
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ buffer.put(_deliveryMode);
+ }
- if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _correlationId);
- }
+ if ((_propertyFlags & PROPRITY_MASK) != 0)
+ {
+ buffer.put(_priority);
+ }
- if ((_propertyFlags & REPLY_TO_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _replyTo);
- }
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
- if ((_propertyFlags & EXPIRATION_MASK) != 0)
- {
- if (_expiration == 0L)
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
}
- else
+
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ if (_expiration == 0L)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ }
+ else
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
}
- }
- if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _messageId);
- }
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
- if ((_propertyFlags & TIMESTAMP_MASK) != 0)
- {
- EncodingUtils.writeTimestamp(buffer, _timestamp);
- }
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
- if ((_propertyFlags & TYPE_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _type);
- }
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
- if ((_propertyFlags & USER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _userId);
- }
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
- if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _appId);
- }
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
- if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
}
}
- public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+ public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
{
_propertyFlags = propertyFlags;
@@ -269,18 +331,25 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
}
decode(buffer);
+ /*_encodedForm = new byte[size];
+ buffer.get(_encodedForm, 0, size);
+ _decoded = false;
+ _decodedHeaders = false;
+ _decodedContentType = false;*/
}
- private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException
+ private void decode(ByteBuffer buffer)
{
// ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
-
+ int pos = buffer.position();
+ try
+ {
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
_contentType = EncodingUtils.readAMQShortString(buffer);
}
- if ((_propertyFlags & ENCODING_MASK) != 0)
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
{
_encoding = EncodingUtils.readAMQShortString(buffer);
}
@@ -292,12 +361,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
{
- _deliveryMode = buffer.readByte();
+ _deliveryMode = buffer.get();
}
- if ((_propertyFlags & PRIORITY_MASK) != 0)
+ if ((_propertyFlags & PROPRITY_MASK) != 0)
{
- _priority = buffer.readByte();
+ _priority = buffer.get();
}
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
@@ -344,29 +413,116 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
{
_clusterId = EncodingUtils.readAMQShortString(buffer);
}
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e, e);
+ }
+ final int endPos = buffer.position();
+ buffer.position(pos);
+ final int len = endPos - pos;
+ _encodedForm = new byte[len];
+ final int limit = buffer.limit();
+ buffer.limit(endPos);
+ buffer.get(_encodedForm, 0, len);
+ buffer.limit(limit);
+ buffer.position(endPos);
+ _decoded = true;
+ }
+ private void decodeUpToHeaders()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ try
+ {
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+
+ }
+
+ _decodedHeaders = true;
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e, e);
+ }
}
+ private void decodeUpToContentType()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ _contentType = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ _decodedContentType = true;
+ }
+
+ private void decodeIfNecessary()
+ {
+ if (!_decoded)
+ {
+ // decode();
+ }
+ }
+
+ private void decodeHeadersIfNecessary()
+ {
+ if (!_decoded && !_decodedHeaders)
+ {
+ decodeUpToHeaders();
+ }
+ }
+
+ private void decodeContentTypeIfNecessary()
+ {
+ if (!_decoded && !_decodedContentType)
+ {
+ decodeUpToContentType();
+ }
+ }
public AMQShortString getContentType()
{
+ decodeContentTypeIfNecessary();
+
return _contentType;
}
public String getContentTypeAsString()
{
+ decodeContentTypeIfNecessary();
+
return (_contentType == null) ? null : _contentType.toString();
}
public void setContentType(AMQShortString contentType)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= (CONTENT_TYPE_MASK);
_contentType = contentType;
}
public void setContentType(String contentType)
{
+ _hasBeenUpdated = true;
setContentType((contentType == null) ? null : new AMQShortString(contentType));
}
@@ -378,23 +534,31 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public AMQShortString getEncoding()
{
+ decodeIfNecessary();
+
return _encoding;
}
public void setEncoding(String encoding)
{
- _propertyFlags |= ENCODING_MASK;
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= ENCONDING_MASK;
_encoding = (encoding == null) ? null : new AMQShortString(encoding);
}
public void setEncoding(AMQShortString encoding)
{
- _propertyFlags |= ENCODING_MASK;
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= ENCONDING_MASK;
_encoding = encoding;
}
public FieldTable getHeaders()
{
+ decodeHeadersIfNecessary();
+
if (_headers == null)
{
setHeaders(FieldTableFactory.newFieldTable());
@@ -405,146 +569,191 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setHeaders(FieldTable headers)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= HEADERS_MASK;
_headers = headers;
}
public byte getDeliveryMode()
{
+ decodeIfNecessary();
+
return _deliveryMode;
}
public void setDeliveryMode(byte deliveryMode)
{
+ clearEncodedForm();
_propertyFlags |= DELIVERY_MODE_MASK;
_deliveryMode = deliveryMode;
}
public byte getPriority()
{
+ decodeIfNecessary();
+
return _priority;
}
public void setPriority(byte priority)
{
- _propertyFlags |= PRIORITY_MASK;
+ clearEncodedForm();
+ _propertyFlags |= PROPRITY_MASK;
_priority = priority;
}
public AMQShortString getCorrelationId()
{
+ decodeIfNecessary();
+
return _correlationId;
}
public String getCorrelationIdAsString()
{
+ decodeIfNecessary();
+
return (_correlationId == null) ? null : _correlationId.toString();
}
public void setCorrelationId(String correlationId)
{
+ _hasBeenUpdated = true;
setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
}
public void setCorrelationId(AMQShortString correlationId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= CORRELATION_ID_MASK;
_correlationId = correlationId;
}
public String getReplyToAsString()
{
+ decodeIfNecessary();
+
return (_replyTo == null) ? null : _replyTo.toString();
}
public AMQShortString getReplyTo()
{
+ decodeIfNecessary();
+
return _replyTo;
}
public void setReplyTo(String replyTo)
{
+ _hasBeenUpdated = true;
setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
}
public void setReplyTo(AMQShortString replyTo)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= REPLY_TO_MASK;
_replyTo = replyTo;
}
public long getExpiration()
{
+ decodeIfNecessary();
return _expiration;
}
public void setExpiration(long expiration)
{
+ clearEncodedForm();
_propertyFlags |= EXPIRATION_MASK;
_expiration = expiration;
}
public AMQShortString getMessageId()
{
+ decodeIfNecessary();
+
return _messageId;
}
public String getMessageIdAsString()
{
+ decodeIfNecessary();
+
return (_messageId == null) ? null : _messageId.toString();
}
public void setMessageId(String messageId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = (messageId == null) ? null : new AMQShortString(messageId);
}
public void setMessageId(AMQShortString messageId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = messageId;
}
public long getTimestamp()
{
+ decodeIfNecessary();
return _timestamp;
}
public void setTimestamp(long timestamp)
{
+ clearEncodedForm();
_propertyFlags |= TIMESTAMP_MASK;
_timestamp = timestamp;
}
public String getTypeAsString()
{
+ decodeIfNecessary();
+
return (_type == null) ? null : _type.toString();
}
public AMQShortString getType()
{
+ decodeIfNecessary();
+
return _type;
}
public void setType(String type)
{
+ _hasBeenUpdated = true;
setType((type == null) ? null : new AMQShortString(type));
}
public void setType(AMQShortString type)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= TYPE_MASK;
_type = type;
}
public String getUserIdAsString()
{
+ decodeIfNecessary();
+
return (_userId == null) ? null : _userId.toString();
}
public AMQShortString getUserId()
{
+ decodeIfNecessary();
+
return _userId;
}
@@ -555,48 +764,65 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
public void setUserId(AMQShortString userId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= USER_ID_MASK;
_userId = userId;
}
public String getAppIdAsString()
{
+ decodeIfNecessary();
+
return (_appId == null) ? null : _appId.toString();
}
public AMQShortString getAppId()
{
+ decodeIfNecessary();
+
return _appId;
}
public void setAppId(String appId)
{
+ _hasBeenUpdated = true;
setAppId((appId == null) ? null : new AMQShortString(appId));
}
public void setAppId(AMQShortString appId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= APPLICATION_ID_MASK;
_appId = appId;
+ _hasBeenUpdated = true;
}
public String getClusterIdAsString()
{
+ _hasBeenUpdated = true;
+ decodeIfNecessary();
return (_clusterId == null) ? null : _clusterId.toString();
}
public AMQShortString getClusterId()
{
+ _hasBeenUpdated = true;
+ decodeIfNecessary();
return _clusterId;
}
public void setClusterId(String clusterId)
{
+ _hasBeenUpdated = true;
setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
}
public void setClusterId(AMQShortString clusterId)
{
+ _hasBeenUpdated = true;
+ clearEncodedForm();
_propertyFlags |= CLUSTER_ID_MASK;
_clusterId = clusterId;
}