diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java new file mode 100755 index 0000000000..cf8ae2166c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -0,0 +1,242 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.BBDecoder; + +import java.nio.ByteBuffer; +import java.lang.ref.SoftReference; + +public class MessageMetaData_0_10 implements StorableMessageMetaData +{ + private Header _header; + private DeliveryProperties _deliveryProps; + private MessageProperties _messageProps; + private MessageTransferHeader _messageHeader; + private long _arrivalTime; + private int _bodySize; + private volatile SoftReference<ByteBuffer> _body; + + private static final int ENCODER_SIZE = 1 << 16; + + public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); + + private volatile ByteBuffer _encoded; + + + public MessageMetaData_0_10(MessageTransfer xfr) + { + this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis()); + } + + private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) + { + this(header, bodySize, null, arrivalTime); + } + + private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime) + { + _header = header; + if(_header != null) + { + _deliveryProps = _header.get(DeliveryProperties.class); + _messageProps = _header.get(MessageProperties.class); + } + else + { + _deliveryProps = null; + _messageProps = null; + } + _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); + _arrivalTime = arrivalTime; + _bodySize = bodySize; + + + + if(xfrBody == null) + { + _body = null; + } + else + { + ByteBuffer body = ByteBuffer.allocate(_bodySize); + body.put(xfrBody); + body.flip(); + _body = new SoftReference<ByteBuffer>(body); + } + + + } + + + + public MessageMetaDataType getType() + { + return MessageMetaDataType.META_DATA_0_10; + } + + public int getStorableSize() + { + ByteBuffer buf = _encoded; + + if(buf == null) + { + buf = encodeAsBuffer(); + _encoded = buf; + } + + //TODO -- need to add stuff + return buf.limit(); + } + + private ByteBuffer encodeAsBuffer() + { + BBEncoder encoder = new BBEncoder(ENCODER_SIZE); + + encoder.writeInt64(_arrivalTime); + encoder.writeInt32(_bodySize); + Struct[] headers = _header == null ? new Struct[0] : _header.getStructs(); + encoder.writeInt32(headers.length); + + + for(Struct header : headers) + { + encoder.writeStruct32(header); + + } + + ByteBuffer buf = encoder.buffer(); + return buf; + } + + public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + { + ByteBuffer buf = _encoded; + + if(buf == null) + { + buf = encodeAsBuffer(); + _encoded = buf; + } + + buf = buf.duplicate(); + + buf.position(offsetInMetaData); + + if(dest.remaining() < buf.limit()) + { + buf.limit(dest.remaining()); + } + dest.put(buf); + return buf.limit(); + } + + public int getContentSize() + { + return _bodySize; + } + + public boolean isPersistent() + { + return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT; + } + + public String getRoutingKey() + { + return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); + } + + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + + public long getSize() + { + + return _bodySize; + } + + public boolean isImmediate() + { + return _deliveryProps != null && _deliveryProps.getImmediate(); + } + + public long getExpiration() + { + return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); + } + + public long getArrivalTime() + { + return _arrivalTime; + } + + public Header getHeader() + { + return _header; + } + + public ByteBuffer getBody() + { + ByteBuffer body = _body == null ? null : _body.get(); + return body; + } + + public void setBody(ByteBuffer body) + { + _body = new SoftReference<ByteBuffer>(body); + } + + private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> + { + public MessageMetaData_0_10 createMetaData(ByteBuffer buf) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(buf); + + long arrivalTime = decoder.readInt64(); + int bodySize = decoder.readInt32(); + int headerCount = decoder.readInt32(); + + Struct[] headers = new Struct[headerCount]; + + for(int i = 0 ; i < headerCount; i++) + { + headers[i] = decoder.readStruct32(); + } + + Header header = new Header(headers); + + return new MessageMetaData_0_10(header, bodySize, arrivalTime); + + } + } +} |