diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-10 15:00:43 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-10 15:00:43 +0000 |
commit | 61f5f631197bd1435b0bc29727465d819d187d83 (patch) | |
tree | d793163278307e5b3617b2b1b0c7c15657aaba2a | |
parent | 9c36740bc0cbede0ce533f137fa048363c4461da (diff) | |
download | qpid-python-61f5f631197bd1435b0bc29727465d819d187d83.tar.gz |
NO-JIRA: 1.0 sandbox merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1167529 13f79535-47bb-0310-9956-ffa450edef68
107 files changed, 3143 insertions, 3002 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java index 0e3a3894fe..d76b163fa1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java @@ -274,132 +274,6 @@ public class HeadersParser } - public static void main(String[] args) throws AMQFrameDecodingException - { - - FieldTable bindingTable = new FieldTable(); - - bindingTable.setString(new AMQShortString("x-match"),"all"); - bindingTable.setInteger("a",1); - bindingTable.setVoid(new AMQShortString("b")); - bindingTable.setString("c",""); - bindingTable.setInteger("d",4); - bindingTable.setInteger("e",1); - - - - FieldTable bindingTable2 = new FieldTable(); - bindingTable2.setString(new AMQShortString("x-match"),"all"); - bindingTable2.setInteger("a",1); - bindingTable2.setVoid(new AMQShortString("b")); - bindingTable2.setString("c",""); - bindingTable2.setInteger("d",4); - bindingTable2.setInteger("e",1); - bindingTable2.setInteger("f",1); - - - FieldTable table = new FieldTable(); - table.setInteger("a",1); - table.setInteger("b",2); - table.setString("c",""); - table.setInteger("d",4); - table.setInteger("e",1); - table.setInteger("f",1); - table.setInteger("h",1); - table.setInteger("i",1); - table.setInteger("j",1); - table.setInteger("k",1); - table.setInteger("l",1); - - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize()); - EncodingUtils.writeFieldTableBytes(buffer, table); - buffer.flip(); - - FieldTable table2 = EncodingUtils.readFieldTable(buffer); - - - - FieldTable bindingTable3 = new FieldTable(); - bindingTable3.setString(new AMQShortString("x-match"),"any"); - bindingTable3.setInteger("a",1); - bindingTable3.setInteger("b",3); - - - FieldTable bindingTable4 = new FieldTable(); - bindingTable4.setString(new AMQShortString("x-match"),"any"); - bindingTable4.setVoid(new AMQShortString("a")); - - - FieldTable bindingTable5 = new FieldTable(); - bindingTable5.setString(new AMQShortString("x-match"),"all"); - bindingTable5.setString(new AMQShortString("h"),"hello"); - - for(int i = 0; i < 100; i++) - { - printMatches(new FieldTable[] {bindingTable5} , table2); - } - - - - } - - - - private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey) - { - HeadersMatcherDFAState sm = null; - Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>(); - - HeadersParser parser = new HeadersParser(); - - for(int i = 0; i < bindingKeys.length; i++) - { - HeaderMatcherResult r = new HeaderMatcherResult(); - resultMap.put(r, bindingKeys[i].toString()); - - - if(i==0) - { - sm = parser.createStateMachine(bindingKeys[i], r); - } - else - { - sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r)); - } - } - - Collection<HeaderMatcherResult> results = null; - long beforeTime = System.currentTimeMillis(); - for(int i = 0; i < 1000000; i++) - { - routingKey.size(); - - assert sm != null; - results = sm.match(routingKey); - - } - long elapsed = System.currentTimeMillis() - beforeTime; - System.out.println("1000000 Iterations took: " + elapsed); - Collection<String> resultStrings = new ArrayList<String>(); - - assert results != null; - for(HeaderMatcherResult result : results) - { - resultStrings.add(resultMap.get(result)); - } - - final ArrayList<String> nonMatches = new ArrayList<String>(); - for(FieldTable key : bindingKeys) - { - nonMatches.add(key.toString()); - } - nonMatches.removeAll(resultStrings); - System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches); - - - } - - public final static class KeyValuePair { public final HeaderKey _key; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java index 2825fa1b75..286fc78719 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java @@ -20,11 +20,15 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.RootMessageLogger; +import java.security.AccessController; +import java.security.Principal; import java.text.MessageFormat; +import java.util.Set; + +import javax.management.remote.JMXPrincipal; +import javax.security.auth.Subject; /** * NOTE: This actor is not thread safe. @@ -40,16 +44,23 @@ import java.text.MessageFormat; */ public class ManagementActor extends AbstractActor { + /** + * Holds the principal name to display when principal subject is not available. + * <p> + * This is useful for cases when users invoke JMX operation over JConsole + * attached to the local JVM. + */ + private static final String UNKNOWN_PRINCIPAL = "N/A"; + String _lastThreadName = null; /** * LOG FORMAT for the ManagementActor, - * Uses a MessageFormat call to insert the requried values according to - * these indicies: + * Uses a MessageFormat call to insert the required values according to + * these indices: * - * 0 - Connection ID - * 1 - User ID - * 2 - IP + * 0 - User ID + * 1 - IP */ public static final String MANAGEMENT_FORMAT = "mng:{0}({1})"; @@ -75,19 +86,20 @@ public class ManagementActor extends AbstractActor _lastThreadName = currentName; // Management Thread names have this format. - //RMI TCP Connection(2)-169.24.29.116 + // RMI TCP Connection(2)-169.24.29.116 // This is true for both LocalAPI and JMX Connections // However to be defensive lets test. String[] split = currentName.split("\\("); if (split.length == 2) { - String connectionID = split[1].split("\\)")[0]; String ip = currentName.split("-")[1]; - - actor = MessageFormat.format(MANAGEMENT_FORMAT, - connectionID, - ip); + String principalName = getPrincipalName(); + if (principalName == null) + { + principalName = UNKNOWN_PRINCIPAL; + } + actor = MessageFormat.format(MANAGEMENT_FORMAT, principalName, ip); } else { @@ -105,6 +117,30 @@ public class ManagementActor extends AbstractActor } } + /** + * Returns current JMX principal name. + * + * @return principal name or null if principal can not be found + */ + protected String getPrincipalName() + { + String identity = null; + + // retrieve Subject from current AccessControlContext + final Subject subject = Subject.getSubject(AccessController.getContext()); + if (subject != null) + { + // retrieve JMXPrincipal from Subject + final Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class); + if (principals != null && !principals.isEmpty()) + { + final Principal principal = principals.iterator().next(); + identity = principal.getName(); + } + } + return identity; + } + public String getLogMessage() { updateLogString(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties index ab77476da2..ac77f674f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties @@ -30,4 +30,4 @@ STOPPED = MNG-1005 : Stopped # 0 - Path SSL_KEYSTORE = MNG-1006 : Using SSL Keystore : {0} OPEN = MNG-1007 : Open : User {0} -CLOSE = MNG-1008 : Close
\ No newline at end of file +CLOSE = MNG-1008 : Close : User {0}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java index 68f7689283..169195304c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -313,7 +313,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler, Notificati else if (notification.getType().equals(JMXConnectionNotification.CLOSED) || notification.getType().equals(JMXConnectionNotification.FAILED)) { - _logActor.message(ManagementConsoleMessages.CLOSE()); + _logActor.message(ManagementConsoleMessages.CLOSE(user)); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java index 66cb7ed83b..5992e42fb7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java @@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.AMQException; +import org.apache.qpid.server.util.ByteBufferInputStream; +import org.apache.qpid.server.util.ByteBufferOutputStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.Set; @@ -120,38 +123,38 @@ public class MessageMetaData implements StorableMessageMetaData return size; } + public int writeToBuffer(int offset, ByteBuffer dest) { - ByteBuffer src = ByteBuffer.allocate((int)getStorableSize()); - - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src); - EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize()); - _contentHeaderBody.writePayload(minaSrc); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange()); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey()); - byte flags = 0; - if(_messagePublishInfo.isMandatory()) - { - flags |= MANDATORY_FLAG; - } - if(_messagePublishInfo.isImmediate()) + int oldPosition = dest.position(); + try { - flags |= IMMEDIATE_FLAG; + + DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest)); + EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize()); + _contentHeaderBody.writePayload(dataOutputStream); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange()); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey()); + byte flags = 0; + if(_messagePublishInfo.isMandatory()) + { + flags |= MANDATORY_FLAG; + } + if(_messagePublishInfo.isImmediate()) + { + flags |= IMMEDIATE_FLAG; + } + dest.put(flags); + dest.putLong(_arrivalTime); + } - EncodingUtils.writeByte(minaSrc, flags); - EncodingUtils.writeLong(minaSrc,_arrivalTime); - src.position(minaSrc.position()); - src.flip(); - src.position(offset); - src = src.slice(); - if(dest.remaining() < src.limit()) + catch (IOException e) { - src.limit(dest.remaining()); + // This shouldn't happen as we are not actually using anything that can throw an IO Exception + throw new RuntimeException(e); } - dest.put(src); - - return src.limit(); + return dest.position()-oldPosition; } public int getContentSize() @@ -173,14 +176,15 @@ public class MessageMetaData implements StorableMessageMetaData { try { - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf); - int size = EncodingUtils.readInteger(minaSrc); - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size); - final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc); - final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc); + ByteBufferInputStream bbis = new ByteBufferInputStream(buf); + DataInputStream dais = new DataInputStream(bbis); + int size = EncodingUtils.readInteger(dais); + ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size); + final AMQShortString exchange = EncodingUtils.readAMQShortString(dais); + final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais); - final byte flags = EncodingUtils.readByte(minaSrc); - long arrivalTime = EncodingUtils.readLong(minaSrc); + final byte flags = EncodingUtils.readByte(dais); + long arrivalTime = EncodingUtils.readLong(dais); MessagePublishInfo publishBody = new MessagePublishInfo() @@ -216,6 +220,10 @@ public class MessageMetaData implements StorableMessageMetaData { throw new RuntimeException(e); } + catch (IOException e) + { + throw new RuntimeException(e); + } } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 2cebec373e..3970e5a2d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -26,6 +26,7 @@ */
package org.apache.qpid.server.output.amqp0_8;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
- METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter };
}
+
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
}
+
private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
return chb;
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
{
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
throws AMQException
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+ int bodySize = (int) message.getSize();
- final int bodySize = (int) message.getSize();
if(bodySize == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
writeFrame(compositeBlock);
}
else
{
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int writtenSize = 0;
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
}
+ }
+ }
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
}
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
}
- private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
+
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = entry.isRedelivered();
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter exchangeName,
routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
+
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource content,
- int channelId,
- int replyCode,
- AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
- writeMessageDelivery(content, header, channelId, returnFrame);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
+
BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 319b5cc7bd..aef3483282 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9; *
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index cffbe445ee..10748298bc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9_1; *
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
-
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 88022ba519..5332031362 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -348,7 +350,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { // Log incomming protocol negotiation request @@ -368,7 +370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr null, mechanisms.getBytes(), locales.getBytes()); - _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(asByteBuffer(responseBody.generateFrame(0))); _sender.flush(); } @@ -376,11 +378,43 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); _sender.flush(); } } + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + buf.flip(); + return buf; + } + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); @@ -491,7 +525,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); @@ -1020,7 +1054,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void writerIdle() { - _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } public void exception(Throwable throwable) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 3e3288404f..a56f5685b8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -139,7 +139,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public int addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf()); + _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -263,7 +263,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes int written = 0; for(ContentChunk cb : _contentChunks) { - ByteBuffer data = cb.getData().buf(); + ByteBuffer data = ByteBuffer.wrap(cb.getData()); if(offset+written >= pos && offset < pos + data.limit()) { ByteBuffer src = data.duplicate(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java index 9f56b8521a..dee40e7069 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.security.auth.sasl.amqplain; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import javax.security.auth.callback.Callback; @@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -60,7 +61,7 @@ public class AmqPlainSaslServer implements SaslServer { try { - final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length); + final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length); String username = (String) ft.getString("LOGIN"); // we do not care about the prompt but it throws if null NameCallback nameCb = new NameCallback("prompt", username); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java index b4cce15d88..52d36023c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java @@ -20,21 +20,9 @@ */ package org.apache.qpid.server.security.auth.sasl.anonymous; -import java.io.IOException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; public class AnonymousSaslServer implements SaslServer { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 2e694b24ea..8b099b62ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; @@ -479,9 +480,15 @@ public class DerbyMessageStore implements MessageStore FieldTable arguments; if(dataAsBytes.length > 0) { - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes); - arguments = new FieldTable(buffer,buffer.limit()); + try + { + arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length); + } + catch (IOException e) + { + throw new RuntimeException("IO Exception should not be thrown",e); + } } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 0921decda4..d9845c5fa6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -189,7 +189,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public boolean isSuspended() { - return !isActive() || _deleted.get(); // TODO check for Session suspension + return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension } public boolean hasInterest(QueueEntry entry) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 279394e677..d0197ab492 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -677,4 +677,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi getChannel()) + "] "; } + + @Override + public void close() + { + // unregister subscriptions in order to prevent sending of new messages + // to subscriptions with closing session + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + unregister(subscription_0_10); + } + + super.close(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java new file mode 100644 index 0000000000..898a667736 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream +{ + private final ByteBuffer _buffer; + + public ByteBufferInputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public int read() throws IOException + { + return _buffer.get() & 0xFF; + } + + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + if(_buffer.remaining() < len) + { + len = _buffer.remaining(); + } + _buffer.get(b, off, len); + + return len; + } + + @Override + public void mark(int readlimit) + { + _buffer.mark(); + } + + @Override + public void reset() throws IOException + { + _buffer.reset(); + } + + @Override + public boolean markSupported() + { + return true; + } + + @Override + public long skip(long n) throws IOException + { + + _buffer.position(_buffer.position()+(int)n); + + return n; + } + + @Override + public int available() throws IOException + { + return _buffer.remaining(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java index d3b8ecf8bd..ca9a41bc32 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java @@ -18,28 +18,29 @@ * under the License. * */ -package org.apache.qpid.framing; +package org.apache.qpid.server.util; -import org.apache.mina.common.ByteBuffer; +import java.io.OutputStream; +import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Set; - -public final class AMQDataBlockEncoder +public class ByteBufferOutputStream extends OutputStream { - private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class); + private final ByteBuffer _buffer; - private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); - - public AMQDataBlockEncoder() - { } + public ByteBufferOutputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + @Override + public void write(int b) + { + _buffer.put((byte)b); + } - public Set getMessageTypes() + @Override + public void write(byte[] b, int off, int len) { - return _messageTypes; + _buffer.put(b, off, len); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 96a9ac729e..0fd31973b2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import org.apache.qpid.server.util.ByteBufferInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa FieldTable argumentsFT = null; if(buf != null) { - argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit()); + try + { + argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); + } + catch (IOException e) + { + throw new RuntimeException("IOException should not be thrown here", e); + } } BindingFactory bf = _virtualHost.getBindingFactory(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 904f052a0f..886396acb8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -570,8 +571,8 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase int pos = 0; for(ContentBody body : bodies) { - storedMessage.addContent(pos, body.payload.duplicate().buf()); - pos += body.payload.limit(); + storedMessage.addContent(pos, ByteBuffer.wrap(body._payload)); + pos += body._payload.length; } _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java index 033ae3b4b3..d6b790db01 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; - +import java.security.PrivilegedAction; +import java.util.Collections; import java.util.List; +import javax.management.remote.JMXPrincipal; +import javax.security.auth.Subject; + /** * Test : AMQPManagementActorTest * Validate the AMQPManagementActor class. @@ -96,8 +96,40 @@ public class ManagementActorTest extends BaseActorTestCase // Verify that the message has the right values assertTrue("Message contains the [mng: prefix", - logs.get(0).toString().contains("[mng:" + CONNECTION_ID + "(" + IP + ")")); + logs.get(0).toString().contains("[mng:N/A(" + IP + ")")); + } + + /** + * Tests appearance of principal name in log message + */ + public void testSubjectPrincipalNameAppearance() + { + Subject subject = new Subject(true, Collections.singleton(new JMXPrincipal("guest")), Collections.EMPTY_SET, + Collections.EMPTY_SET); + + final String message = Subject.doAs(subject, new PrivilegedAction<String>() + { + public String run() + { + return sendTestLogMessage(_amqpActor); + } + }); + + // Verify that the log message was created + assertNotNull("Test log message is not created!", message); + + List<Object> logs = _rawLogger.getLogMessages(); + + // Verify that at least one log message was added to log + assertEquals("Message log size not as expected.", 1, logs.size()); + + String logMessage = logs.get(0).toString(); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", logMessage.contains(message)); + // Verify that the message has the right principal value + assertTrue("Message contains the [mng: prefix", logMessage.contains("[mng:guest(" + IP + ")")); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 4272c77798..47b8b7eb18 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -312,18 +311,14 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase { messages[i].addContentBodyFrame(new ContentChunk(){ - ByteBuffer _data = ByteBuffer.allocate((int)size); - - { - _data.limit((int)size); - } + byte[] _data = new byte[(int)size]; public int getSize() { return (int) size; } - public ByteBuffer getData() + public byte[] getData() { return _data; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 365353e734..070d105805 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.mina.common.ByteBuffer; import javax.management.JMException; @@ -275,18 +274,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase msg.addContentBodyFrame(new ContentChunk() { - ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); - - { - _data.limit((int)MESSAGE_SIZE); - } + byte[] _data = new byte[((int)MESSAGE_SIZE)]; public int getSize() { return (int) MESSAGE_SIZE; } - public ByteBuffer getData() + public byte[] getData() { return _data; } @@ -441,8 +436,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase getSession().getMethodRegistry() .getProtocolVersionMethodConverter() .convertToContentChunk( - new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), - MESSAGE_SIZE))); + new ContentBody(new byte[(int) MESSAGE_SIZE]))); AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); for(BaseQueue q : currentMessage.getDestinationQueues()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c38188cdc3..d9194a3408 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1183,7 +1183,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (code != null) { - je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause); + je = new JMSException("Exception thrown against " + toString() + ": " + cause, Integer.toString(code.getCode())); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5821fee7ff..d739903ee6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -201,8 +201,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); } } - - messageProps.setContentLength(message.getContentLength()); + + ByteBuffer data = message.getData(); + messageProps.setContentLength(data.remaining()); // send the message try @@ -221,8 +222,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) && (destination.getLink().getReliability() == Reliability.UNRELIABLE); - org.apache.mina.common.ByteBuffer data = message.getData(); - ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); + + ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), MessageAcceptMode.NONE, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 27f7486890..26e9814e33 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -27,14 +27,13 @@ import javax.jms.Message; import javax.jms.Topic; import javax.jms.Queue; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.AMQMessageDelegate; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; @@ -186,7 +185,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (frames.length == (offset + 1)) { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); + byte[] data = new byte[payload.remaining()]; + payload.get(data); + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data)); } else { @@ -198,7 +199,10 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer payload.position((int) framePayloadMax * (i - offset)); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); + byte[] data = new byte[payload.remaining()]; + payload.get(data); + + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data)); remaining -= length; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java index 8c3f2fd08f..e5b95f54f4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java @@ -21,11 +21,6 @@ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.AMQException; - public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate> { public static AMQMessageDelegateFactory DEFAULT_FACTORY = null; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index cec4268a7b..b9ba946a20 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -499,7 +499,6 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); } - _contentHeaderProperties.updated(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 7f735e0722..be71c8c657 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -24,11 +24,11 @@ package org.apache.qpid.client.message; import java.util.List; import java.util.Map; import java.util.UUID; +import java.nio.ByteBuffer; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.BBEncoder; @@ -81,18 +81,19 @@ public class AMQPEncodedMapMessage extends JMSMapMessage @ Override public ByteBuffer getData() { - writeMapToData(); - return _data; + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap(_map); + return encoder.segment(); } @ Override - protected void populateMapFromData() throws JMSException + protected void populateMapFromData(ByteBuffer data) throws JMSException { - if (_data != null) + if (data != null) { - _data.rewind(); + data.rewind(); BBDecoder decoder = new BBDecoder(); - decoder.init(_data.buf()); + decoder.init(data); _map = decoder.readMap(); } else @@ -101,14 +102,6 @@ public class AMQPEncodedMapMessage extends JMSMapMessage } } - @ Override - protected void writeMapToData() - { - BBEncoder encoder = new BBEncoder(1024); - encoder.writeMap(_map); - _data = ByteBuffer.wrap(encoder.segment()); - } - // for testing public Map<String,Object> getMap() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java index 4978d1ce85..2c38f153cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java @@ -1,6 +1,6 @@ package org.apache.qpid.client.message; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,22 +8,23 @@ package org.apache.qpid.client.message; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory @@ -36,7 +37,7 @@ public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory return new AMQPEncodedMapMessage(delegate,data); } - @Override + public AbstractJMSMessage createMessage( AMQMessageDelegateFactory delegateFactory) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java deleted file mode 100644 index 3846ee043d..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import java.io.IOException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.transport.util.Functions; - -/** - * @author Apache Software Foundation - */ -public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - - AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory) - { - this(delegateFactory, null); - } - - /** - * Construct a bytes message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); // this instanties a content header - setContentType(getMimeType()); - - if (_data == null) - { - allocateInitialBuffer(); - } - } - - protected void allocateInitialBuffer() - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } - - AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException - { - super(delegate, data); - setContentType(getMimeType()); - } - - - public void clearBodyImpl() throws JMSException - { - allocateInitialBuffer(); - } - - public String toBodyString() throws JMSException - { - try - { - if (_data != null) - { - return Functions.str(_data.buf(), 100,0); - } - else - { - return ""; - } - - } - catch (Exception e) - { - JMSException jmse = new JMSException(e.toString()); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws javax.jms.MessageEOFException if there are less than len bytes available to read - */ - protected void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 85818dcd2b..ddeb62fbf6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -21,784 +21,96 @@ package org.apache.qpid.client.message; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.ByteBuffer; import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.transport.util.Functions; /** * @author Apache Software Foundation */ -public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { + protected boolean _readableMessage = false; - protected static final byte BOOLEAN_TYPE = (byte) 1; - - protected static final byte BYTE_TYPE = (byte) 2; - - protected static final byte BYTEARRAY_TYPE = (byte) 3; - - protected static final byte SHORT_TYPE = (byte) 4; - - protected static final byte CHAR_TYPE = (byte) 5; - - protected static final byte INT_TYPE = (byte) 6; - - protected static final byte LONG_TYPE = (byte) 7; - - protected static final byte FLOAT_TYPE = (byte) 8; - - protected static final byte DOUBLE_TYPE = (byte) 9; - - protected static final byte STRING_TYPE = (byte) 10; - - protected static final byte NULL_STRING_TYPE = (byte) 11; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory) - { - - this(delegateFactory, null); - } - - /** - * Construct a stream message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { - super(delegateFactory, data); // this instanties a content header + super(delegateFactory, fromReceivedMessage); // this instanties a content header + _readableMessage = fromReceivedMessage; } - AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException { - super(delegate, data); - } + super(delegate, fromReceivedMessage); + _readableMessage = fromReceivedMessage; - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); } - protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + protected void checkReadable() throws MessageNotReadableException { - checkWritable(); - _data.put(type); - _changedData = true; - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try + if (!_readableMessage) { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; + throw new MessageNotReadableException("You need to call reset() to make the message readable"); } } - private boolean readBooleanImpl() + @Override + protected void checkWritable() throws MessageNotWriteableException { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try + super.checkWritable(); + if(_readableMessage) { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; } - private byte readByteImpl() + public void clearBody() throws JMSException { - return _data.get(); + super.clearBody(); + _readableMessage = false; } - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - private short readShortImpl() + public String toBodyString() throws JMSException { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); try { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); + ByteBuffer data = getData(); + if (data != null) + { + return Functions.str(data, 100, 0); + } + else + { + return ""; } - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); } - catch (CharacterCodingException e) + catch (Exception e) { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + JMSException jmse = new JMSException(e.toString()); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) - { - _data.putInt(i); - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); - } - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); } - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); - } - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - writeStringImpl(string); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - } - - protected void writeStringImpl(String string) - throws CharacterCodingException - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } + abstract public void reset(); - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } - } - protected void writeObject(Object object) throws JMSException - { - checkWritable(); - Class clazz; - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 6ba55b207a..f713554bfb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,66 +20,38 @@ */ package org.apache.qpid.client.message; -import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Enumeration; import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message { - - protected ByteBuffer _data; - protected boolean _readableMessage = false; - protected boolean _changedData = true; - /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - - - protected AMQMessageDelegate _delegate; private boolean _redelivered; + private boolean _receivedFromServer; - protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData) { _delegate = delegateFactory.createDelegate(); - _data = data; - if (_data != null) - { - _data.acquire(); - } - - - _readableMessage = (data != null); - _changedData = (data == null); - + setContentType(getMimeType()); } - protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException { _delegate = delegate; - - _data = data; - if (_data != null) - { - _data.acquire(); - } - - _readableMessage = data != null; - + setContentType(getMimeType()); } public String getJMSMessageID() throws JMSException @@ -329,12 +301,9 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message public void clearBody() throws JMSException { - clearBodyImpl(); - _readableMessage = false; - + _receivedFromServer = false; } - public void acknowledgeThis() throws JMSException { _delegate.acknowledgeThis(); @@ -345,14 +314,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message _delegate.acknowledge(); } - /** - * This forces concrete classes to implement clearBody() - * - * @throws JMSException - */ - public abstract void clearBodyImpl() throws JMSException; - - /** + /* * Get a String representation of the body of the message. Used in the toString() method which outputs this before * message properties. */ @@ -413,63 +375,24 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message return _delegate; } - public ByteBuffer getData() - { - // make sure we rewind the data just in case any method has moved the - // position beyond the start - if (_data != null) - { - reset(); - } + abstract public ByteBuffer getData() throws JMSException; - return _data; - } - - protected void checkReadable() throws MessageNotReadableException - { - if (!_readableMessage) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } protected void checkWritable() throws MessageNotWriteableException { - if (_readableMessage) + if (_receivedFromServer) { throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } } - public void reset() - { - if (!_changedData) - { - _data.rewind(); - } - else - { - _data.flip(); - _changedData = false; - } - } - public int getContentLength() + public void setReceivedFromServer() { - if(_data != null) - { - return _data.remaining(); - } - else - { - return 0; - } + _receivedFromServer = true; } - public void receivedFromServer() - { - _changedData = false; - } + /** * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 40c1df0c5d..967a1fb49f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -38,6 +36,8 @@ import javax.jms.JMSException; import java.util.Iterator; import java.util.List; +import java.nio.ByteBuffer; + public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); @@ -57,7 +57,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); } - data = ((ContentBody) bodies.get(0)).payload; + data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload); } else if (bodies != null) { @@ -72,7 +72,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = cb.payload; + final ByteBuffer payload = ByteBuffer.wrap(cb._payload); if(payload.isDirect() || payload.isReadOnly()) { data.put(payload); @@ -82,7 +82,6 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory data.put(payload.array(), payload.arrayOffset(), payload.limit()); } - payload.release(); } data.flip(); @@ -109,7 +108,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps, - DeliveryProperties deliveryProps, + DeliveryProperties deliveryProps, java.nio.ByteBuffer body) throws AMQException { ByteBuffer data; @@ -118,7 +117,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory if (body != null) { - data = ByteBuffer.wrap(body); + data = body; } else // body == null { @@ -155,7 +154,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); - msg.receivedFromServer(); + msg.setReceivedFromServer(); return msg; } @@ -166,7 +165,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory final AbstractJMSMessage msg = create010MessageWithBody(messageNbr,msgProps,deliveryProps, body); msg.setJMSRedelivered(redelivered); - msg.receivedFromServer(); + msg.setReceivedFromServer(); return msg; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b87275a9ce..e252bdb719 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.message; +import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; @@ -28,47 +29,56 @@ import java.nio.charset.CharsetEncoder; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage +public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { public static final String MIME_TYPE = "application/octet-stream"; + private TypedBytesContentReader _typedBytesContentReader; + private TypedBytesContentWriter _typedBytesContentWriter; - public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory) - { - this(delegateFactory,null); - } - - /** - * Construct a bytes message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory) { - - super(delegateFactory, data); // this instanties a content header + super(delegateFactory,false); + _typedBytesContentWriter = new TypedBytesContentWriter(); } JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data!=null); + _typedBytesContentReader = new TypedBytesContentReader(data); } public void reset() { - super.reset(); _readableMessage = true; + + if(_typedBytesContentReader != null) + { + _typedBytesContentReader.reset(); + } + else if (_typedBytesContentWriter != null) + { + _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData()); + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _typedBytesContentReader = null; + _typedBytesContentWriter = new TypedBytesContentWriter(); + } protected String getMimeType() @@ -76,45 +86,57 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag return MIME_TYPE; } + @Override + public java.nio.ByteBuffer getData() throws JMSException + { + return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData(); + } + public long getBodyLength() throws JMSException { checkReadable(); - return _data.limit(); + return _typedBytesContentReader.size(); } public boolean readBoolean() throws JMSException { checkReadable(); checkAvailable(1); - return _data.get() != 0; + + return _typedBytesContentReader.readBooleanImpl(); + } + + private void checkAvailable(final int i) throws MessageEOFException + { + _typedBytesContentReader.checkAvailable(1); } public byte readByte() throws JMSException { checkReadable(); checkAvailable(1); - return _data.get(); + return _typedBytesContentReader.readByteImpl(); } public int readUnsignedByte() throws JMSException { checkReadable(); checkAvailable(1); - return _data.getUnsigned(); + return _typedBytesContentReader.readByteImpl() & 0xFF; } public short readShort() throws JMSException { checkReadable(); checkAvailable(2); - return _data.getShort(); + return _typedBytesContentReader.readShortImpl(); } public int readUnsignedShort() throws JMSException { checkReadable(); checkAvailable(2); - return _data.getUnsignedShort(); + return _typedBytesContentReader.readShortImpl() & 0xFFFF; } /** @@ -127,35 +149,35 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag { checkReadable(); checkAvailable(2); - return _data.getChar(); + return _typedBytesContentReader.readCharImpl(); } public int readInt() throws JMSException { checkReadable(); checkAvailable(4); - return _data.getInt(); + return _typedBytesContentReader.readIntImpl(); } public long readLong() throws JMSException { checkReadable(); checkAvailable(8); - return _data.getLong(); + return _typedBytesContentReader.readLongImpl(); } public float readFloat() throws JMSException { checkReadable(); checkAvailable(4); - return _data.getFloat(); + return _typedBytesContentReader.readFloatImpl(); } public double readDouble() throws JMSException { checkReadable(); checkAvailable(8); - return _data.getDouble(); + return _typedBytesContentReader.readDoubleImpl(); } public String readUTF() throws JMSException @@ -164,34 +186,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - try - { - short length = readShort(); - if(length == 0) - { - return ""; - } - else - { - CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); - ByteBuffer encodedString = _data.slice(); - encodedString.limit(length); - _data.position(_data.position()+length); - CharBuffer string = decoder.decode(encodedString.buf()); - - return string.toString(); - } - - - - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } + return _typedBytesContentReader.readLengthPrefixedUTF(); } public int readBytes(byte[] bytes) throws JMSException @@ -201,14 +196,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag throw new IllegalArgumentException("byte array must not be null"); } checkReadable(); - int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); + int count = (_typedBytesContentReader.remaining() >= bytes.length ? bytes.length : _typedBytesContentReader.remaining()); if (count == 0) { return -1; } else { - _data.get(bytes, 0, count); + _typedBytesContentReader.readRawBytes(bytes, 0, count); return count; } } @@ -224,110 +219,82 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag throw new IllegalArgumentException("maxLength must be <= bytes.length"); } checkReadable(); - int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); + int count = (_typedBytesContentReader.remaining() >= maxLength ? maxLength : _typedBytesContentReader.remaining()); if (count == 0) { return -1; } else { - _data.get(bytes, 0, count); + _typedBytesContentReader.readRawBytes(bytes, 0, count); return count; } } + public void writeBoolean(boolean b) throws JMSException { checkWritable(); - _changedData = true; - _data.put(b ? (byte) 1 : (byte) 0); + _typedBytesContentWriter.writeBooleanImpl(b); } public void writeByte(byte b) throws JMSException { checkWritable(); - _changedData = true; - _data.put(b); + _typedBytesContentWriter.writeByteImpl(b); } public void writeShort(short i) throws JMSException { checkWritable(); - _changedData = true; - _data.putShort(i); + _typedBytesContentWriter.writeShortImpl(i); } public void writeChar(char c) throws JMSException { checkWritable(); - _changedData = true; - _data.putChar(c); + _typedBytesContentWriter.writeCharImpl(c); } public void writeInt(int i) throws JMSException { checkWritable(); - _changedData = true; - _data.putInt(i); + _typedBytesContentWriter.writeIntImpl(i); } public void writeLong(long l) throws JMSException { checkWritable(); - _changedData = true; - _data.putLong(l); + _typedBytesContentWriter.writeLongImpl(l); } public void writeFloat(float v) throws JMSException { checkWritable(); - _changedData = true; - _data.putFloat(v); + _typedBytesContentWriter.writeFloatImpl(v); } public void writeDouble(double v) throws JMSException { checkWritable(); - _changedData = true; - _data.putDouble(v); + _typedBytesContentWriter.writeDoubleImpl(v); } public void writeUTF(String string) throws JMSException { checkWritable(); - try - { - CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); - java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); - - _data.putShort((short)encodedString.limit()); - _data.put(encodedString); - _changedData = true; - //_data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must add the null terminator manually - //_data.put((byte)0); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } + _typedBytesContentWriter.writeLengthPrefixedUTF(string); } public void writeBytes(byte[] bytes) throws JMSException { - checkWritable(); - _data.put(bytes); - _changedData = true; + writeBytes(bytes, 0, bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { checkWritable(); - _data.put(bytes, offset, length); - _changedData = true; + _typedBytesContentWriter.writeBytesRaw(bytes, offset, length); } public void writeObject(Object object) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index cb04ebee1b..89561b88eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -22,11 +22,12 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; +import java.nio.ByteBuffer; + public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index e295d4a2a0..52c0eb263b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.client.message; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.Enumeration; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -282,7 +285,7 @@ public final class JMSHeaderAdapter s = String.valueOf(o); } } - }//else return s // null; + }//else return s // null; } return s; @@ -458,9 +461,29 @@ public final class JMSHeaderAdapter return getHeaders().isEmpty(); } - public void writeToBuffer(ByteBuffer data) + public void writeToBuffer(final ByteBuffer data) { - getHeaders().writeToBuffer(data); + try + { + getHeaders().writeToBuffer(new DataOutputStream(new OutputStream() + { + @Override + public void write(final int b) + { + data.put((byte)b); + } + + @Override + public void write(final byte[] b, final int off, final int len) + { + data.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e); + } } public Enumeration getMapNames() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 306ffeeadf..fad24a968e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,11 +20,8 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +29,14 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.MessageFormatException; +import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; -public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage +public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage { private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); @@ -54,10 +52,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException { - super(delegateFactory, data); // this instantiates a content header + super(delegateFactory, data!=null); // this instantiates a content header if(data != null) { - populateMapFromData(); + populateMapFromData(data); } } @@ -65,10 +63,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data != null); try { - populateMapFromData(); + populateMapFromData(data); } catch (JMSException je) { @@ -89,18 +87,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return MIME_TYPE; } - public ByteBuffer getData() - { - // What if _data is null? - writeMapToData(); - - return super.getData(); - } - @Override - public void clearBodyImpl() throws JMSException + public void clearBody() throws JMSException { - super.clearBodyImpl(); + super.clearBody(); _map.clear(); } @@ -458,17 +448,18 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm return _map.containsKey(propName); } - protected void populateMapFromData() throws JMSException + protected void populateMapFromData(ByteBuffer data) throws JMSException { - if (_data != null) + TypedBytesContentReader reader = new TypedBytesContentReader(data); + if (data != null) { - _data.rewind(); + data.rewind(); - final int entries = readIntImpl(); + final int entries = reader.readIntImpl(); for (int i = 0; i < entries; i++) { - String propName = readStringImpl(); - Object value = readObject(); + String propName = reader.readStringImpl(); + Object value = reader.readObject(); _map.put(propName, value); } } @@ -478,35 +469,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } } - protected void writeMapToData() + public ByteBuffer getData() + throws JMSException { - allocateInitialBuffer(); + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + final int size = _map.size(); - writeIntImpl(size); + writer.writeIntImpl(size); for (Map.Entry<String, Object> entry : _map.entrySet()) { - try - { - writeStringImpl(entry.getKey()); - } - catch (CharacterCodingException e) - { - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); - - } + writer.writeNullTerminatedStringImpl(entry.getKey()); - try - { - writeObject(entry.getValue()); - } - catch (JMSException e) - { - Object value = entry.getValue(); - throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value - + " (type: " + value.getClass().getName() + ").", e); - } + writer.writeObject(entry.getValue()); } + return writer.getData(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index eccb90560b..89408a5c3c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -14,18 +14,16 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSMapMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 370e2d6c55..c981c951c3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -20,27 +20,28 @@ */ package org.apache.qpid.client.message; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; +import java.io.*; +import java.nio.ByteBuffer; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { public static final String MIME_TYPE = "application/java-object-stream"; + private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256; + + private Serializable _readData; + private ByteBuffer _data; + private Exception _exception; + + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - private static final int DEFAULT_BUFFER_SIZE = 1024; /** * Creates empty, writable message for use by producers @@ -48,41 +49,57 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag */ public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory) { - this(delegateFactory, null); - } - - private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); - if (data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); - } - - setContentType(getMimeType()); + super(delegateFactory, false); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException { - super(delegate, data); + super(delegate, data!=null); + + try + { + ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() + { + + + @Override + public int read() throws IOException + { + return data.get(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + len = data.remaining() < len ? data.remaining() : len; + data.get(b, off, len); + return len; + } + }); + + _readData = (Serializable) in.readObject(); + } + catch (IOException e) + { + _exception = e; + } + catch (ClassNotFoundException e) + { + _exception = e; + } } - public void clearBodyImpl() throws JMSException + public void clearBody() throws JMSException { - if (_data != null) - { - _data.release(); - _data = null; - } - - - + super.clearBody(); + _exception = null; + _readData = null; + _data = null; } public String toBodyString() throws JMSException @@ -95,83 +112,116 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag return MIME_TYPE; } - public void setObject(Serializable serializable) throws JMSException + @Override + public ByteBuffer getData() throws JMSException { - checkWritable(); - - if (_data == null) + if(_exception != null) + { + final MessageFormatException messageFormatException = + new MessageFormatException("Unable to deserialize message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; + } + if(_readData == null) { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - _data.setAutoExpand(true); + + return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate(); } else { - _data.rewind(); + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(_readData); + oos.flush(); + return ByteBuffer.wrap(baos.toByteArray()); + } + catch (IOException e) + { + final JMSException jmsException = new JMSException("Unable to encode object of type: " + + _readData.getClass().getName() + ", value " + _readData); + jmsException.setLinkedException(e); + throw jmsException; + } } + } + + public void setObject(Serializable serializable) throws JMSException + { + checkWritable(); + clearBody(); try { - ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); - out.writeObject(serializable); - out.flush(); - out.close(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(serializable); + oos.flush(); + _data = ByteBuffer.wrap(baos.toByteArray()); } catch (IOException e) { - MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; + final JMSException jmsException = new JMSException("Unable to encode object of type: " + + serializable.getClass().getName() + ", value " + serializable); + jmsException.setLinkedException(e); + throw jmsException; } } public Serializable getObject() throws JMSException { - ObjectInputStream in = null; - if (_data == null) + if(_exception != null) { - return null; + final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; } - - try + else if(_readData != null || _data == null) { - _data.rewind(); - in = new ClassLoadingAwareObjectInputStream(_data.asInputStream()); - - return (Serializable) in.readObject(); + return _readData; } - catch (IOException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; - } - catch (ClassNotFoundException e) - { - MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); - mfe.setLinkedException(e); - mfe.initCause(e); - throw mfe; - } - finally + else { - // _data.rewind(); - close(in); - } - } + Exception exception = null; - private static void close(InputStream in) - { - try - { - if (in != null) + final ByteBuffer data = _data.duplicate(); + try + { + ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() + { + @Override + public int read() throws IOException + { + return data.get(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + len = data.remaining() < len ? data.remaining() : len; + data.get(b, off, len); + return len; + } + }); + + return (Serializable) in.readObject(); + } + catch (ClassNotFoundException e) + { + exception = e; + } + catch (IOException e) { - in.close(); + exception = e; } + + JMSException jmsException = new JMSException("Could not deserialize object"); + jmsException.setLinkedException(exception); + throw jmsException; } - catch (IOException ignore) - { } + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index 03851dfa01..4660c91c1f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,8 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index ad2620852b..5c93f6b6f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -23,7 +23,8 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; import javax.jms.StreamMessage; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -36,65 +37,76 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public static final String MIME_TYPE="jms/stream-message"; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; + private TypedBytesContentReader _typedBytesContentReader; + private TypedBytesContentWriter _typedBytesContentWriter; public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory) { - this(delegateFactory,null); + super(delegateFactory,false); + _typedBytesContentWriter = new TypedBytesContentWriter(); } - /** - * Construct a stream message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) - { - super(delegateFactory, data); // this instanties a content header - } JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - - super(delegate, data); + super(delegate, data!=null); + _typedBytesContentReader = new TypedBytesContentReader(data); } - public void reset() { - super.reset(); _readableMessage = true; + + if(_typedBytesContentReader != null) + { + _typedBytesContentReader.reset(); + } + else if (_typedBytesContentWriter != null) + { + _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData()); + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _typedBytesContentReader = null; + _typedBytesContentWriter = new TypedBytesContentWriter(); + } + protected String getMimeType() { return MIME_TYPE; } - + @Override + public java.nio.ByteBuffer getData() throws JMSException + { + return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData(); + } public boolean readBoolean() throws JMSException { - return super.readBoolean(); + checkReadable(); + return _typedBytesContentReader.readBoolean(); } public byte readByte() throws JMSException { - return super.readByte(); + checkReadable(); + return _typedBytesContentReader.readByte(); } public short readShort() throws JMSException { - return super.readShort(); + checkReadable(); + return _typedBytesContentReader.readShort(); } /** @@ -105,102 +117,127 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea */ public char readChar() throws JMSException { - return super.readChar(); + checkReadable(); + return _typedBytesContentReader.readChar(); } public int readInt() throws JMSException { - return super.readInt(); + checkReadable(); + return _typedBytesContentReader.readInt(); } public long readLong() throws JMSException { - return super.readLong(); + checkReadable(); + return _typedBytesContentReader.readLong(); } public float readFloat() throws JMSException { - return super.readFloat(); + checkReadable(); + return _typedBytesContentReader.readFloat(); } public double readDouble() throws JMSException { - return super.readDouble(); + checkReadable(); + return _typedBytesContentReader.readDouble(); } public String readString() throws JMSException { - return super.readString(); + checkReadable(); + return _typedBytesContentReader.readString(); } public int readBytes(byte[] bytes) throws JMSException { - return super.readBytes(bytes); + if(bytes == null) + { + throw new IllegalArgumentException("Must provide non-null array to read into"); + } + + checkReadable(); + return _typedBytesContentReader.readBytes(bytes); } public Object readObject() throws JMSException { - return super.readObject(); + checkReadable(); + return _typedBytesContentReader.readObject(); } public void writeBoolean(boolean b) throws JMSException { - super.writeBoolean(b); + checkWritable(); + _typedBytesContentWriter.writeBoolean(b); } public void writeByte(byte b) throws JMSException { - super.writeByte(b); + checkWritable(); + _typedBytesContentWriter.writeByte(b); } public void writeShort(short i) throws JMSException { - super.writeShort(i); + checkWritable(); + _typedBytesContentWriter.writeShort(i); } public void writeChar(char c) throws JMSException { - super.writeChar(c); + checkWritable(); + _typedBytesContentWriter.writeChar(c); } public void writeInt(int i) throws JMSException { - super.writeInt(i); + checkWritable(); + _typedBytesContentWriter.writeInt(i); } public void writeLong(long l) throws JMSException { - super.writeLong(l); + checkWritable(); + _typedBytesContentWriter.writeLong(l); } public void writeFloat(float v) throws JMSException { - super.writeFloat(v); + checkWritable(); + _typedBytesContentWriter.writeFloat(v); } public void writeDouble(double v) throws JMSException { - super.writeDouble(v); + checkWritable(); + _typedBytesContentWriter.writeDouble(v); } public void writeString(String string) throws JMSException { - super.writeString(string); + checkWritable(); + _typedBytesContentWriter.writeString(string); } public void writeBytes(byte[] bytes) throws JMSException { - super.writeBytes(bytes); + checkWritable(); + _typedBytesContentWriter.writeBytes(bytes); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { - super.writeBytes(bytes,offset,length); + checkWritable(); + _typedBytesContentWriter.writeBytes(bytes, offset, length); } public void writeObject(Object object) throws JMSException { - super.writeObject(object); + checkWritable(); + _typedBytesContentWriter.writeObject(object); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index 5e25db9ae0..359f5157f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -22,10 +22,9 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index fc2006a119..acf3a0ca14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.client.message; +import java.io.DataInputStream; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; import javax.jms.JMSException; +import javax.jms.MessageFormatException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.util.Strings; @@ -37,6 +43,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { private static final String MIME_TYPE = "text/plain"; + private Exception _exception; private String _decodedValue; /** @@ -45,36 +52,41 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString(); private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException - { - this(delegateFactory, null, null); - } + private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder(); + private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder(); + + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException + public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - super(delegateFactory, data); // this instantiates a content header - setContentType(getMimeType()); - setEncoding(encoding); + super(delegateFactory, false); // this instantiates a content header } JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data); - setContentType(getMimeType()); - _data = data; - } + super(delegate, data!=null); - - public void clearBodyImpl() throws JMSException - { - if (_data != null) + try { - _data.release(); - _data = null; + if(propertyExists(PAYLOAD_NULL_PROPERTY)) + { + _decodedValue = null; + } + else + { + _decodedValue = _decoder.decode(data).toString(); + } + } + catch (CharacterCodingException e) + { + _exception = e; + } + catch (JMSException e) + { + _exception = e; } - _decodedValue = null; } public String toBodyString() throws JMSException @@ -87,95 +99,62 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return MIME_TYPE; } - public void setText(String text) throws JMSException + @Override + public ByteBuffer getData() throws JMSException { - checkWritable(); - - clearBody(); + _encoder.reset(); try { - if (text != null) + if(_exception != null) + { + final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message"); + messageFormatException.setLinkedException(_exception); + throw messageFormatException; + } + else if(_decodedValue == null) + { + return EMPTY_BYTE_BUFFER; + } + else { - final String encoding = getEncoding(); - if (encoding == null || encoding.equalsIgnoreCase("UTF-8")) - { - _data = ByteBuffer.wrap(Strings.toUTF8(text)); - setEncoding("UTF-8"); - } - else - { - _data = ByteBuffer.wrap(text.getBytes(encoding)); - } - _data.position(_data.limit()); - _changedData=true; + return _encoder.encode(CharBuffer.wrap(_decodedValue)); } - _decodedValue = text; } - catch (UnsupportedEncodingException e) + catch (CharacterCodingException e) { - // should never occur - JMSException jmse = new JMSException("Unable to decode text data"); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue); + jmsException.setLinkedException(e); + throw jmsException; } } - public String getText() throws JMSException + @Override + public void clearBody() throws JMSException { - if (_data == null && _decodedValue == null) - { - return null; - } - else if (_decodedValue != null) - { - return _decodedValue; - } - else - { - _data.rewind(); + super.clearBody(); + _decodedValue = null; + _exception = null; + } - if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) - { - return null; - } - if (getEncoding() != null) - { - try - { - _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Could not decode string data: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - else - { - try - { - _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Could not decode string data: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - return _decodedValue; - } + public void setText(String text) throws JMSException + { + checkWritable(); + + clearBody(); + _decodedValue = text; + + } + + public String getText() throws JMSException + { + return _decodedValue; } @Override public void prepareForSending() throws JMSException { super.prepareForSending(); - if (_data == null) + if (_decodedValue == null) { setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index 1f4d64c78f..d1af32c10a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -22,7 +22,7 @@ package org.apache.qpid.client.message; import javax.jms.JMSException; -import org.apache.mina.common.ByteBuffer; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java new file mode 100644 index 0000000000..26a0b41cdc --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +public interface TypedBytesCodes +{ + static final byte BOOLEAN_TYPE = (byte) 1; + + static final byte BYTE_TYPE = (byte) 2; + + static final byte BYTEARRAY_TYPE = (byte) 3; + + static final byte SHORT_TYPE = (byte) 4; + + static final byte CHAR_TYPE = (byte) 5; + + static final byte INT_TYPE = (byte) 6; + + static final byte LONG_TYPE = (byte) 7; + + static final byte FLOAT_TYPE = (byte) 8; + + static final byte DOUBLE_TYPE = (byte) 9; + + static final byte STRING_TYPE = (byte) 10; + + static final byte NULL_STRING_TYPE = (byte) 11; +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java new file mode 100644 index 0000000000..1ae25eb1ed --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java @@ -0,0 +1,674 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +class TypedBytesContentReader implements TypedBytesCodes +{ + + private final ByteBuffer _data; + private final int _position; + private final int _limit; + + + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); + + private int _byteArrayRemaining = -1; + + + public TypedBytesContentReader(final ByteBuffer data) + { + _data = data.duplicate(); + _position = _data.position(); + _limit = _data.limit(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkAvailable(1); + return _data.get(); + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if (wireType == NULL_STRING_TYPE) + { + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + _charsetDecoder.reset(); + ByteBuffer dup = _data.duplicate(); + int pos = _data.position(); + byte b; + while((b = _data.get()) != 0); + dup.limit(_data.position()-1); + return _charsetDecoder.decode(dup).toString(); + + } + catch (CharacterCodingException e) + { + JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + + size + + " but message only contains " + + + _data.remaining() + + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public void reset() + { + _byteArrayRemaining = -1; + _data.position(_position); + _data.limit(_limit); + } + + public ByteBuffer getData() + { + ByteBuffer buf = _data.duplicate(); + buf.position(_position); + buf.limit(_limit); + return buf; + } + + public long size() + { + return _limit - _position; + } + + public int remaining() + { + return _data.remaining(); + } + + public void readRawBytes(final byte[] bytes, final int offset, final int count) + { + _data.get(bytes, offset, count); + } + + public String readLengthPrefixedUTF() throws JMSException + { + try + { + short length = readShortImpl(); + if(length == 0) + { + return ""; + } + else + { + _charsetDecoder.reset(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = _charsetDecoder.decode(encodedString); + + return string.toString(); + } + } + catch(CharacterCodingException e) + { + JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java new file mode 100644 index 0000000000..7c91db3a32 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +class TypedBytesContentWriter implements TypedBytesCodes +{ + private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); + private final DataOutputStream _data = new DataOutputStream(_baos); + private static final Charset UTF8 = Charset.forName("UTF-8"); + + protected void writeTypeDiscriminator(byte type) throws JMSException + { + try + { + _data.writeByte(type); + } + catch (IOException e) + { + throw handle(e); + } + } + + private JMSException handle(final IOException e) + { + JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage()); + jmsEx.setLinkedException(e); + return jmsEx; + } + + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + writeBooleanImpl(b); + } + + public void writeBooleanImpl(final boolean b) throws JMSException + { + try + { + _data.writeByte(b ? (byte) 1 : (byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + writeByteImpl(b); + } + + public void writeByteImpl(final byte b) throws JMSException + { + try + { + _data.writeByte(b); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + writeShortImpl(i); + } + + public void writeShortImpl(final short i) throws JMSException + { + try + { + _data.writeShort(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + writeCharImpl(c); + } + + public void writeCharImpl(final char c) throws JMSException + { + try + { + _data.writeChar(c); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) throws JMSException + { + try + { + _data.writeInt(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + writeLongImpl(l); + } + + public void writeLongImpl(final long l) throws JMSException + { + try + { + _data.writeLong(l); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + writeFloatImpl(v); + } + + public void writeFloatImpl(final float v) throws JMSException + { + try + { + _data.writeFloat(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + writeDoubleImpl(v); + } + + public void writeDoubleImpl(final double v) throws JMSException + { + try + { + _data.writeDouble(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + writeNullTerminatedStringImpl(string); + } + } + + protected void writeNullTerminatedStringImpl(String string) + throws JMSException + { + try + { + _data.write(string.getBytes(UTF8)); + _data.writeByte((byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + writeBytesImpl(bytes, offset, length); + } + + public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException + { + try + { + if (bytes == null) + { + _data.writeInt(-1); + } + else + { + _data.writeInt(length); + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException + { + try + { + if (bytes != null) + { + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + + protected void writeObject(Object object) throws JMSException + { + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public ByteBuffer getData() + { + return ByteBuffer.wrap(_baos.toByteArray()); + } + + public void writeLengthPrefixedUTF(final String string) throws JMSException + { + try + { + CharsetEncoder encoder = UTF8.newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + writeShortImpl((short) encodedString.limit()); + while(encodedString.hasRemaining()) + { + _data.writeByte(encodedString.get()); + } + } + catch (CharacterCodingException e) + { + JMSException jmse = new JMSException("Unable to encode string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + catch (IOException e) + { + throw handle(e); + } + + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index 685e646d85..ce87a112c9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public void receiveBody(ContentBody body) { - if (body.payload != null) + if (body._payload != null) { - final long payloadSize = body.payload.remaining(); + final long payloadSize = body._payload.length; if (_bodies == null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 6d6cd9cae5..624cf67593 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.client.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -524,7 +526,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean wait) { - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); _sender.flush(); @@ -547,6 +549,39 @@ public class AMQProtocolHandler implements ProtocolEngine } + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + buf.flip(); + return buf; + } + + /** * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to * calling getProtocolSession().write() then waiting for the response. diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java index 93a4fb39f5..a12e4ce977 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.ObjectOutputStream; -import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.test.utils.QpidTestCase; public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase @@ -37,16 +37,15 @@ public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase protected void setUp() throws Exception { //Create a viable input stream for instantiating the CLA OIS - ByteBuffer buf = ByteBuffer.allocate(10); - buf.setAutoExpand(true); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(buf.asOutputStream()); + ObjectOutputStream out = new ObjectOutputStream(baos); out.writeObject("testString"); out.flush(); out.close(); - buf.rewind(); - _in = buf.asInputStream(); + + _in = new ByteArrayInputStream(baos.toByteArray()); _claOIS = new ClassLoadingAwareObjectInputStream(_in); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java index 177dccea7e..e37970e9a2 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java @@ -45,8 +45,6 @@ public class ObjectMessageUnitTest extends QpidTestCase _om.setObject(true); //make the message readable - _om.reset(); - Object object = _om.getObject(); assertTrue("Unexpected type returned", object instanceof Boolean); @@ -61,8 +59,6 @@ public class ObjectMessageUnitTest extends QpidTestCase _om.setObject("test string"); //make the message readable - _om.reset(); - Object object = _om.getObject(); assertTrue("Unexpected type returned", object instanceof String); @@ -87,7 +83,6 @@ public class ObjectMessageUnitTest extends QpidTestCase list.add(0); //make the message readable - _om.reset(); //retrieve the Object Object object = _om.getObject(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7732ff2fd5..69bf73bb49 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.codec; -import java.util.ArrayList; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.SimpleByteBufferAllocator; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; @@ -61,12 +60,11 @@ public class AMQDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; - private boolean firstDecode = true; private AMQMethodBodyFactory _bodyFactory; - private ByteBuffer _remainingBuf; - + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); + /** * Creates a new AMQP decoder. * @@ -92,62 +90,168 @@ public class AMQDecoder _expectProtocolInitiation = expectProtocolInitiation; } + private class RemainingByteArrayInputStream extends InputStream + { + private int _currentListPos; + private int _markPos; - private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); - public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException - { + @Override + public int read() throws IOException + { + ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); + if(currentStream.available() > 0) + { + return currentStream.read(); + } + else if((_currentListPos == _remainingBufs.size()) + || (++_currentListPos == _remainingBufs.size())) + { + return -1; + } + else + { - // get prior remaining data from accumulator - ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); - ByteBuffer msg; - // if we have a session buffer, append data to that otherwise - // use the buffer read from the network directly - if( _remainingBuf != null ) + ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); + stream.mark(0); + return stream.read(); + } + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { - _remainingBuf.put(buf); - _remainingBuf.flip(); - msg = _remainingBuf; + + if(_currentListPos == _remainingBufs.size()) + { + return -1; + } + else + { + ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); + final int available = currentStream.available(); + int read = currentStream.read(b, off, len > available ? available : len); + if(read < len) + { + if(_currentListPos++ != _remainingBufs.size()) + { + _remainingBufs.get(_currentListPos).mark(0); + } + int correctRead = read == -1 ? 0 : read; + int subRead = read(b, off+correctRead, len-correctRead); + if(subRead == -1) + { + return read; + } + else + { + return correctRead+subRead; + } + } + else + { + return len; + } + } } - else + + @Override + public int available() throws IOException { - msg = ByteBuffer.wrap(buf); + int total = 0; + for(int i = _currentListPos; i < _remainingBufs.size(); i++) + { + total += _remainingBufs.get(i).available(); + } + return total; } - - if (_expectProtocolInitiation - || (firstDecode - && (msg.remaining() > 0) - && (msg.get(msg.position()) == (byte)'A'))) + + @Override + public void mark(final int readlimit) { - if (_piDecoder.decodable(msg.buf())) + _markPos = _currentListPos; + final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); + if(stream != null) { - dataBlocks.add(new ProtocolInitiation(msg.buf())); + stream.mark(readlimit); } } - else + + @Override + public void reset() throws IOException { - boolean enoughData = true; - while (enoughData) + _currentListPos = _markPos; + final int size = _remainingBufs.size(); + if(_currentListPos < size) + { + _remainingBufs.get(_currentListPos).reset(); + } + for(int i = _currentListPos+1; i<size; i++) { - int pos = msg.position(); + _remainingBufs.get(i).reset(); + } + } + } + + + public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + DataInputStream msg; + + + ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); + if(!_remainingBufs.isEmpty()) + { + _remainingBufs.add(bais); + msg = new DataInputStream(new RemainingByteArrayInputStream()); + } + else + { + msg = new DataInputStream(bais); + } + boolean enoughData = true; + while (enoughData) + { + if(!_expectProtocolInitiation) + { enoughData = _dataBlockDecoder.decodable(msg); - msg.position(pos); if (enoughData) { dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); } - else + } + else + { + enoughData = _piDecoder.decodable(msg); + if (enoughData) { - _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); - _remainingBuf.setAutoExpand(true); - _remainingBuf.put(msg); + dataBlocks.add(new ProtocolInitiation(msg)); + } + + } + + if(!enoughData) + { + if(!_remainingBufs.isEmpty()) + { + _remainingBufs.remove(_remainingBufs.size()-1); + ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator(); + while(iterator.hasNext() && iterator.next().available() == 0) + { + iterator.remove(); + } + } + if(bais.available()!=0) + { + byte[] remaining = new byte[bais.available()]; + bais.read(remaining); + _remainingBufs.add(new ByteArrayInputStream(remaining)); } } - } - if(firstDecode && dataBlocks.size() > 0) - { - firstDecode = false; } return dataBlocks; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index fe04155bb8..ebdad12178 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -34,7 +36,7 @@ public interface AMQBody */ public abstract int getSize(); - public void writePayload(ByteBuffer buffer); + public void writePayload(DataOutputStream buffer) throws IOException; - void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; + void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index a2fc3a03ef..00c1f5aae5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + /** * A data block represents something that has a size in bytes and the ability to write itself to a byte @@ -39,25 +42,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock * Writes the datablock to the specified buffer. * @param buffer */ - public abstract void writePayload(ByteBuffer buffer); - - public ByteBuffer toByteBuffer() - { - final ByteBuffer buffer = ByteBuffer.allocate((int)getSize()); - - writePayload(buffer); - buffer.flip(); - return buffer; - } - - public java.nio.ByteBuffer toNioByteBuffer() - { - final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); - - ByteBuffer buf = ByteBuffer.wrap(buffer); - writePayload(buf); - buffer.flip(); - return buffer; - } + public abstract void writePayload(DataOutputStream buffer) throws IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 0187fa96a9..2165cadd14 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; + public class AMQDataBlockDecoder { @@ -42,27 +43,32 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException { - final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); // type, channel, body length and end byte if (remainingAfterAttributes < 0) { return false; } - in.position(in.position() + 1 + 2); + in.mark(8); + in.skip(1 + 2); + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() - final long bodySize = in.getInt() & 0xffffffffL; + final long bodySize = in.readInt() & 0xffffffffL; + + in.reset(); return (remainingAfterAttributes >= bodySize); } - public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) - throws AMQFrameDecodingException, AMQProtocolVersionException + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - final byte type = in.get(); + final byte type = in.readByte(); BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) @@ -79,8 +85,8 @@ public class AMQDataBlockDecoder throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); } - final int channel = in.getUnsignedShort(); - final long bodySize = in.getUnsignedInt(); + final int channel = in.readUnsignedShort(); + final long bodySize = EncodingUtils.readUnsignedInteger(in); // bodySize can be zero if ((channel < 0) || (bodySize < 0)) @@ -91,7 +97,7 @@ public class AMQDataBlockDecoder AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); - byte marker = in.get(); + byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize @@ -101,13 +107,4 @@ public class AMQDataBlockDecoder return frame; } - public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException - { - return decodable(msg.buf()); - } - - public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException - { - return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 02a46f3748..6acf60a5b3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { @@ -36,7 +38,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock _bodyFrame = bodyFrame; } - public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException + public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException { this._channel = channel; this._bodyFrame = bodyFactory.createBody(in,bodySize); @@ -53,13 +55,13 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - buffer.put(_bodyFrame.getFrameType()); + buffer.writeByte(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); _bodyFrame.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } public final int getChannel() @@ -77,48 +79,48 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } - public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException { - buffer.put(body.getFrameType()); + buffer.writeByte(body.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); body.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } - public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException { - buffer.put(body1.getFrameType()); + buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); body1.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body2.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body2.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); body2.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } - public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException { - buffer.put(body1.getFrameType()); + buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); body1.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body2.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body2.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); body2.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body3.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body3.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); body3.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 4763b22290..a076d0e5a1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import java.io.DataOutputStream; +import java.io.IOException; + public interface AMQMethodBody extends AMQBody { public static final byte TYPE = 1; @@ -43,12 +45,12 @@ public interface AMQMethodBody extends AMQBody /** @return unsigned short */ public int getMethod(); - public void writeMethodPayload(ByteBuffer buffer); + public void writeMethodPayload(DataOutputStream buffer) throws IOException; public int getSize(); - public void writePayload(ByteBuffer buffer); + public void writePayload(DataOutputStream buffer) throws IOException; //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 1a7022c11b..7fceb082ee 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; + public class AMQMethodBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); @@ -38,7 +39,7 @@ public class AMQMethodBodyFactory implements BodyFactory _protocolSession = protocolSession; } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index cd3d721065..c73c1df701 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -21,13 +21,16 @@ package org.apache.qpid.framing; * */ -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + public abstract class AMQMethodBodyImpl implements AMQMethodBody { public static final byte TYPE = 1; @@ -98,7 +101,7 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return 2 + 2 + getBodySize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, getClazz()); EncodingUtils.writeUnsignedShort(buffer, getMethod()); @@ -106,12 +109,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody } - protected byte readByte(ByteBuffer buffer) + protected byte readByte(DataInputStream buffer) throws IOException { - return buffer.get(); + return buffer.readByte(); } - protected AMQShortString readAMQShortString(ByteBuffer buffer) + protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException { return EncodingUtils.readAMQShortString(buffer); } @@ -121,27 +124,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedShortStringLength(string); } - protected void writeByte(ByteBuffer buffer, byte b) + protected void writeByte(DataOutputStream buffer, byte b) throws IOException { - buffer.put(b); + buffer.writeByte(b); } - protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException { EncodingUtils.writeShortStringBytes(buffer, string); } - protected int readInt(ByteBuffer buffer) + protected int readInt(DataInputStream buffer) throws IOException { - return buffer.getInt(); + return buffer.readInt(); } - protected void writeInt(ByteBuffer buffer, int i) + protected void writeInt(DataOutputStream buffer, int i) throws IOException { - buffer.putInt(i); + buffer.writeInt(i); } - protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException { return EncodingUtils.readFieldTable(buffer); } @@ -151,19 +154,19 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. } - protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException { EncodingUtils.writeFieldTableBytes(buffer, table); } - protected long readLong(ByteBuffer buffer) + protected long readLong(DataInputStream buffer) throws IOException { - return buffer.getLong(); + return buffer.readLong(); } - protected void writeLong(ByteBuffer buffer, long l) + protected void writeLong(DataOutputStream buffer, long l) throws IOException { - buffer.putLong(l); + buffer.writeLong(l); } protected int getSizeOf(byte[] response) @@ -171,87 +174,86 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return (response == null) ? 4 : response.length + 4; } - protected void writeBytes(ByteBuffer buffer, byte[] data) + protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException { EncodingUtils.writeBytes(buffer,data); } - protected byte[] readBytes(ByteBuffer buffer) + protected byte[] readBytes(DataInputStream buffer) throws IOException { return EncodingUtils.readBytes(buffer); } - protected short readShort(ByteBuffer buffer) + protected short readShort(DataInputStream buffer) throws IOException { return EncodingUtils.readShort(buffer); } - protected void writeShort(ByteBuffer buffer, short s) + protected void writeShort(DataOutputStream buffer, short s) throws IOException { EncodingUtils.writeShort(buffer, s); } - protected Content readContent(ByteBuffer buffer) + protected Content readContent(DataInputStream buffer) { - return null; //To change body of created methods use File | Settings | File Templates. + return null; } protected int getSizeOf(Content body) { - return 0; //To change body of created methods use File | Settings | File Templates. + return 0; } - protected void writeContent(ByteBuffer buffer, Content body) + protected void writeContent(DataOutputStream buffer, Content body) { - //To change body of created methods use File | Settings | File Templates. } - protected byte readBitfield(ByteBuffer buffer) + protected byte readBitfield(DataInputStream buffer) throws IOException { - return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + return readByte(buffer); } - protected int readUnsignedShort(ByteBuffer buffer) + protected int readUnsignedShort(DataInputStream buffer) throws IOException { - return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + return buffer.readUnsignedShort(); } - protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException { - buffer.put(bitfield0); + buffer.writeByte(bitfield0); } - protected void writeUnsignedShort(ByteBuffer buffer, int s) + protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException { EncodingUtils.writeUnsignedShort(buffer, s); } - protected long readUnsignedInteger(ByteBuffer buffer) + protected long readUnsignedInteger(DataInputStream buffer) throws IOException { - return buffer.getUnsignedInt(); + return EncodingUtils.readUnsignedInteger(buffer); } - protected void writeUnsignedInteger(ByteBuffer buffer, long i) + protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, i); } - protected short readUnsignedByte(ByteBuffer buffer) + protected short readUnsignedByte(DataInputStream buffer) throws IOException { - return buffer.getUnsigned(); + return (short) buffer.readUnsignedByte(); } - protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException { EncodingUtils.writeUnsignedByte(buffer, unsignedByte); } - protected long readTimestamp(ByteBuffer buffer) + protected long readTimestamp(DataInputStream buffer) throws IOException { return EncodingUtils.readTimestamp(buffer); } - protected void writeTimestamp(ByteBuffer buffer, long t) + protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException { EncodingUtils.writeTimestamp(buffer, t); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index 0c61d9db3c..df4d8bdcb6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -21,10 +21,11 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java deleted file mode 100644 index bfcc38ad60..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - - -public interface AMQMethodFactory -{ - - // Connection Methods - - ConnectionCloseBody createConnectionClose(); - - // Access Methods - - AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write); - - - // Tx Methods - - TxSelectBody createTxSelect(); - - TxCommitBody createTxCommit(); - - TxRollbackBody createTxRollback(); - - // Channel Methods - - ChannelOpenBody createChannelOpen(); - - ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText); - - ChannelFlowBody createChannelFlow(boolean active); - - - // Exchange Methods - - - ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, - AMQShortString queueName, - AMQShortString routingKey); - - ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket); - - - // Queue Methods - - QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket); - - QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket); - - QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket); - - - // Message Methods - - // In different versions of the protocol we change the class used for message transfer - // abstract this out so the appropriate methods are created - AMQMethodBody createRecover(boolean requeue); - - AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket); - - AMQMethodBody createConsumerCancel(AMQShortString consumerTag); - - AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple); - - AMQMethodBody createRejectBody(long deliveryTag, boolean requeue); - - AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 2b9e2ffaba..cc9a33f4cf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -21,11 +21,12 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.*; import java.lang.ref.WeakReference; @@ -199,27 +200,16 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } - private AMQShortString(ByteBuffer data, final int length) + private AMQShortString(DataInputStream data, final int length) throws IOException { if (length > MAX_LENGTH) { throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); } - if(data.isDirect() || data.isReadOnly()) - { - byte[] dataBytes = new byte[length]; - data.get(dataBytes); - _data = dataBytes; - _offset = 0; - } - else - { - - _data = data.array(); - _offset = data.arrayOffset() + data.position(); - data.skip(length); - - } + byte[] dataBytes = new byte[length]; + data.read(dataBytes); + _data = dataBytes; + _offset = 0; _length = length; } @@ -275,9 +265,9 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return new CharSubSequence(start, end); } - public static AMQShortString readFromBuffer(ByteBuffer buffer) + public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException { - final short length = buffer.getUnsigned(); + final int length = buffer.readUnsignedByte(); if (length == 0) { return null; @@ -303,13 +293,13 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } } - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { final int size = length(); //buffer.setAutoExpand(true); - buffer.put((byte) size); - buffer.put(_data, _offset, size); + buffer.write((byte) size); + buffer.write(_data, _offset, size); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 14fb63da03..f3da64e639 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; /** @@ -60,12 +61,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -106,12 +107,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readUnsignedInteger(buffer); } @@ -137,7 +138,7 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { BigDecimal bd = (BigDecimal) value; @@ -150,7 +151,7 @@ public enum AMQType EncodingUtils.writeInteger(buffer, unscaled); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { byte places = EncodingUtils.readByte(buffer); @@ -182,12 +183,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -246,7 +247,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { // Ensure that the value is a FieldTable. if (!(value instanceof FieldTable)) @@ -267,7 +268,7 @@ public enum AMQType * * @return An instance of the type. */ - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { try { @@ -301,10 +302,10 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) { } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) { return null; } @@ -330,12 +331,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongstr(buffer, (byte[]) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongstr(buffer); } @@ -360,12 +361,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -391,12 +392,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -426,12 +427,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeBoolean(buffer, (Boolean) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readBoolean(buffer); } @@ -461,12 +462,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeChar(buffer, (Character) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readChar(buffer); } @@ -496,12 +497,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeByte(buffer, (Byte) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readByte(buffer); } @@ -535,12 +536,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeShort(buffer, (Short) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readShort(buffer); } @@ -577,12 +578,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeInteger(buffer, (Integer) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readInteger(buffer); } @@ -624,12 +625,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -659,12 +660,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeFloat(buffer, (Float) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readFloat(buffer); } @@ -698,12 +699,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeDouble(buffer, (Double) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readDouble(buffer); } @@ -770,9 +771,9 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeToBuffer(Object value, ByteBuffer buffer) + public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException { - buffer.put(identifier()); + buffer.writeByte(identifier()); writeValueImpl(value, buffer); } @@ -782,7 +783,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - abstract void writeValueImpl(Object value, ByteBuffer buffer); + abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException; /** * Reads an instance of the type from a specified byte buffer. @@ -791,5 +792,5 @@ public enum AMQType * * @return An instance of the type. */ - abstract Object readValueFromBuffer(ByteBuffer buffer); + abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 647d531476..1dbedca362 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Date; import java.util.Map; import java.math.BigDecimal; @@ -60,7 +61,7 @@ public class AMQTypedValue _value = type.toNativeValue(value); } - private AMQTypedValue(AMQType type, ByteBuffer buffer) + private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException { _type = type; _value = type.readValueFromBuffer(buffer); @@ -76,7 +77,7 @@ public class AMQTypedValue return _value; } - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { _type.writeToBuffer(_value, buffer); } @@ -86,9 +87,9 @@ public class AMQTypedValue return _type.getEncodingSize(_value); } - public static AMQTypedValue readFromBuffer(ByteBuffer buffer) + public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException { - AMQType type = AMQTypeMap.getType(buffer.get()); + AMQType type = AMQTypeMap.getType(buffer.readByte()); return new AMQTypedValue(type, buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index c7d89a9927..57622b5054 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,27 +37,6 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private static final AMQShortString ZERO_STRING = null; - /** - * We store the encoded form when we decode the content header so that if we need to write it out without modifying - * it we can do so without incurring the expense of reencoding it - */ - private byte[] _encodedForm; - - /** Flag indicating whether the entire content header has been decoded yet */ - private boolean _decoded = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for - * routing in some cases so we can decode that separately. - */ - private boolean _decodedHeaders = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients - * to determine the message type - */ - private boolean _decodedContentType = true; - private AMQShortString _contentType; private AMQShortString _encoding; @@ -86,10 +67,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private int _propertyFlags = 0; private static final int CONTENT_TYPE_MASK = 1 << 15; - private static final int ENCONDING_MASK = 1 << 14; + private static final int ENCODING_MASK = 1 << 14; private static final int HEADERS_MASK = 1 << 13; private static final int DELIVERY_MODE_MASK = 1 << 12; - private static final int PROPRITY_MASK = 1 << 11; + private static final int PRIORITY_MASK = 1 << 11; private static final int CORRELATION_ID_MASK = 1 << 10; private static final int REPLY_TO_MASK = 1 << 9; private static final int EXPIRATION_MASK = 1 << 8; @@ -101,34 +82,11 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private static final int CLUSTER_ID_MASK = 1 << 2; - /** - * This is 0_10 specific. We use this property to check if some message properties have been changed. - */ - private boolean _hasBeenUpdated = false; - - public boolean reset() - { - boolean result = _hasBeenUpdated; - _hasBeenUpdated = false; - return result; - } - - public void updated() - { - _hasBeenUpdated = true; - } - public BasicContentHeaderProperties() { } public int getPropertyListSize() { - if (_encodedForm != null) - { - return _encodedForm.length; - } - else - { int size = 0; if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0) @@ -136,7 +94,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti size += EncodingUtils.encodedShortStringLength(_contentType); } - if ((_propertyFlags & ENCONDING_MASK) > 0) + if ((_propertyFlags & ENCODING_MASK) > 0) { size += EncodingUtils.encodedShortStringLength(_encoding); } @@ -151,7 +109,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti size += 1; } - if ((_propertyFlags & PROPRITY_MASK) > 0) + if ((_propertyFlags & PRIORITY_MASK) > 0) { size += 1; } @@ -209,23 +167,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } return size; - } - } - - private void clearEncodedForm() - { - if (!_decoded && (_encodedForm != null)) - { - // decode(); - } - - _encodedForm = null; } public void setPropertyFlags(int propertyFlags) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags = propertyFlags; } @@ -234,94 +179,87 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti return _propertyFlags; } - public void writePropertyListPayload(ByteBuffer buffer) + public void writePropertyListPayload(DataOutputStream buffer) throws IOException { - if (_encodedForm != null) + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) { - buffer.put(_encodedForm); + EncodingUtils.writeShortStringBytes(buffer, _contentType); } - else + + if ((_propertyFlags & ENCODING_MASK) != 0) { - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _contentType); - } + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } - if ((_propertyFlags & ENCONDING_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _encoding); - } + if ((_propertyFlags & HEADERS_MASK) != 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - EncodingUtils.writeFieldTableBytes(buffer, _headers); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + buffer.writeByte(_deliveryMode); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - buffer.put(_deliveryMode); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + buffer.writeByte(_priority); + } - if ((_propertyFlags & PROPRITY_MASK) != 0) - { - buffer.put(_priority); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _correlationId); - } + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + if (_expiration == 0L) { - EncodingUtils.writeShortStringBytes(buffer, _replyTo); + EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); } - - if ((_propertyFlags & EXPIRATION_MASK) != 0) + else { - if (_expiration == 0L) - { - EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); - } - else - { - EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); - } + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); } + } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _messageId); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - EncodingUtils.writeTimestamp(buffer, _timestamp); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _type); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _userId); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _appId); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _clusterId); - } + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); } } - public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException + public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; @@ -331,25 +269,18 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } decode(buffer); - /*_encodedForm = new byte[size]; - buffer.get(_encodedForm, 0, size); - _decoded = false; - _decodedHeaders = false; - _decodedContentType = false;*/ } - private void decode(ByteBuffer buffer) + private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException { // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - int pos = buffer.position(); - try - { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) { _contentType = EncodingUtils.readAMQShortString(buffer); } - if ((_propertyFlags & ENCONDING_MASK) != 0) + if ((_propertyFlags & ENCODING_MASK) != 0) { _encoding = EncodingUtils.readAMQShortString(buffer); } @@ -361,12 +292,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) { - _deliveryMode = buffer.get(); + _deliveryMode = buffer.readByte(); } - if ((_propertyFlags & PROPRITY_MASK) != 0) + if ((_propertyFlags & PRIORITY_MASK) != 0) { - _priority = buffer.get(); + _priority = buffer.readByte(); } if ((_propertyFlags & CORRELATION_ID_MASK) != 0) @@ -413,116 +344,29 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _clusterId = EncodingUtils.readAMQShortString(buffer); } - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e, e); - } - final int endPos = buffer.position(); - buffer.position(pos); - final int len = endPos - pos; - _encodedForm = new byte[len]; - final int limit = buffer.limit(); - buffer.limit(endPos); - buffer.get(_encodedForm, 0, len); - buffer.limit(limit); - buffer.position(endPos); - _decoded = true; - } - private void decodeUpToHeaders() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - try - { - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - - if ((_propertyFlags & ENCONDING_MASK) != 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - - if ((_propertyFlags & HEADERS_MASK) != 0) - { - _headers = EncodingUtils.readFieldTable(buffer); - - } - - _decodedHeaders = true; - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e, e); - } } - private void decodeUpToContentType() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - _contentType = EncodingUtils.readAMQShortString(buffer); - } - - _decodedContentType = true; - } - - private void decodeIfNecessary() - { - if (!_decoded) - { - // decode(); - } - } - - private void decodeHeadersIfNecessary() - { - if (!_decoded && !_decodedHeaders) - { - decodeUpToHeaders(); - } - } - - private void decodeContentTypeIfNecessary() - { - if (!_decoded && !_decodedContentType) - { - decodeUpToContentType(); - } - } public AMQShortString getContentType() { - decodeContentTypeIfNecessary(); - return _contentType; } public String getContentTypeAsString() { - decodeContentTypeIfNecessary(); - return (_contentType == null) ? null : _contentType.toString(); } public void setContentType(AMQShortString contentType) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= (CONTENT_TYPE_MASK); _contentType = contentType; } public void setContentType(String contentType) { - _hasBeenUpdated = true; setContentType((contentType == null) ? null : new AMQShortString(contentType)); } @@ -534,31 +378,23 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public AMQShortString getEncoding() { - decodeIfNecessary(); - return _encoding; } public void setEncoding(String encoding) { - _hasBeenUpdated = true; - clearEncodedForm(); - _propertyFlags |= ENCONDING_MASK; + _propertyFlags |= ENCODING_MASK; _encoding = (encoding == null) ? null : new AMQShortString(encoding); } public void setEncoding(AMQShortString encoding) { - _hasBeenUpdated = true; - clearEncodedForm(); - _propertyFlags |= ENCONDING_MASK; + _propertyFlags |= ENCODING_MASK; _encoding = encoding; } public FieldTable getHeaders() { - decodeHeadersIfNecessary(); - if (_headers == null) { setHeaders(FieldTableFactory.newFieldTable()); @@ -569,191 +405,146 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setHeaders(FieldTable headers) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= HEADERS_MASK; _headers = headers; } public byte getDeliveryMode() { - decodeIfNecessary(); - return _deliveryMode; } public void setDeliveryMode(byte deliveryMode) { - clearEncodedForm(); _propertyFlags |= DELIVERY_MODE_MASK; _deliveryMode = deliveryMode; } public byte getPriority() { - decodeIfNecessary(); - return _priority; } public void setPriority(byte priority) { - clearEncodedForm(); - _propertyFlags |= PROPRITY_MASK; + _propertyFlags |= PRIORITY_MASK; _priority = priority; } public AMQShortString getCorrelationId() { - decodeIfNecessary(); - return _correlationId; } public String getCorrelationIdAsString() { - decodeIfNecessary(); - return (_correlationId == null) ? null : _correlationId.toString(); } public void setCorrelationId(String correlationId) { - _hasBeenUpdated = true; setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId)); } public void setCorrelationId(AMQShortString correlationId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= CORRELATION_ID_MASK; _correlationId = correlationId; } public String getReplyToAsString() { - decodeIfNecessary(); - return (_replyTo == null) ? null : _replyTo.toString(); } public AMQShortString getReplyTo() { - decodeIfNecessary(); - return _replyTo; } public void setReplyTo(String replyTo) { - _hasBeenUpdated = true; setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo)); } public void setReplyTo(AMQShortString replyTo) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= REPLY_TO_MASK; _replyTo = replyTo; } public long getExpiration() { - decodeIfNecessary(); return _expiration; } public void setExpiration(long expiration) { - clearEncodedForm(); _propertyFlags |= EXPIRATION_MASK; _expiration = expiration; } public AMQShortString getMessageId() { - decodeIfNecessary(); - return _messageId; } public String getMessageIdAsString() { - decodeIfNecessary(); - return (_messageId == null) ? null : _messageId.toString(); } public void setMessageId(String messageId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= MESSAGE_ID_MASK; _messageId = (messageId == null) ? null : new AMQShortString(messageId); } public void setMessageId(AMQShortString messageId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= MESSAGE_ID_MASK; _messageId = messageId; } public long getTimestamp() { - decodeIfNecessary(); return _timestamp; } public void setTimestamp(long timestamp) { - clearEncodedForm(); _propertyFlags |= TIMESTAMP_MASK; _timestamp = timestamp; } public String getTypeAsString() { - decodeIfNecessary(); - return (_type == null) ? null : _type.toString(); } public AMQShortString getType() { - decodeIfNecessary(); - return _type; } public void setType(String type) { - _hasBeenUpdated = true; setType((type == null) ? null : new AMQShortString(type)); } public void setType(AMQShortString type) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= TYPE_MASK; _type = type; } public String getUserIdAsString() { - decodeIfNecessary(); - return (_userId == null) ? null : _userId.toString(); } public AMQShortString getUserId() { - decodeIfNecessary(); - return _userId; } @@ -764,65 +555,48 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setUserId(AMQShortString userId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= USER_ID_MASK; _userId = userId; } public String getAppIdAsString() { - decodeIfNecessary(); - return (_appId == null) ? null : _appId.toString(); } public AMQShortString getAppId() { - decodeIfNecessary(); - return _appId; } public void setAppId(String appId) { - _hasBeenUpdated = true; setAppId((appId == null) ? null : new AMQShortString(appId)); } public void setAppId(AMQShortString appId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= APPLICATION_ID_MASK; _appId = appId; - _hasBeenUpdated = true; } public String getClusterIdAsString() { - _hasBeenUpdated = true; - decodeIfNecessary(); return (_clusterId == null) ? null : _clusterId.toString(); } public AMQShortString getClusterId() { - _hasBeenUpdated = true; - decodeIfNecessary(); return _clusterId; } public void setClusterId(String clusterId) { - _hasBeenUpdated = true; setClusterId((clusterId == null) ? null : new AMQShortString(clusterId)); } public void setClusterId(AMQShortString clusterId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= CLUSTER_ID_MASK; _clusterId = clusterId; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index 59646577e1..f9580d82b1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; /** * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; + AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 94030f383e..15bc20c52d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -49,7 +50,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD return frameSize; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { for (int i = 0; i < _blocks.length; i++) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 9d39f8aa86..aedb35f92a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -28,27 +31,22 @@ public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public ByteBuffer payload; + public byte[] _payload; public ContentBody() { } - public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { - if (size > 0) - { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); - } - + _payload = new byte[(int)size]; + buffer.read(_payload); } - public ContentBody(ByteBuffer payload) + public ContentBody(byte[] payload) { - this.payload = payload; + _payload = payload; } public byte getFrameType() @@ -58,23 +56,12 @@ public class ContentBody implements AMQBody public int getSize() { - return (payload == null ? 0 : payload.limit()); + return _payload == null ? 0 : _payload.length; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - if (payload != null) - { - if(payload.isDirect() || payload.isReadOnly()) - { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); - } - else - { - buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); - } - } + buffer.write(_payload); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -83,32 +70,18 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { if (size > 0) { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); + _payload = new byte[(int)size]; + buffer.read(_payload); } } public void reduceBufferToFit() { - if (payload != null && (payload.remaining() < payload.capacity() / 2)) - { - int size = payload.limit(); - ByteBuffer newPayload = ByteBuffer.allocate(size); - - newPayload.put(payload); - newPayload.flip(); - - //reduce reference count on payload - payload.release(); - - payload = newPayload; - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index c42995d148..a0b030ab6b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { return new ContentBody(in, bodySize); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 7526d4c756..18d0f26152 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -42,12 +45,12 @@ public class ContentHeaderBody implements AMQBody { } - public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { - classId = buffer.getUnsignedShort(); - weight = buffer.getUnsignedShort(); - bodySize = buffer.getLong(); - int propertyFlags = buffer.getUnsignedShort(); + classId = buffer.readUnsignedShort(); + weight = buffer.readUnsignedShort(); + bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); @@ -72,13 +75,13 @@ public class ContentHeaderBody implements AMQBody return TYPE; } - protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + protected void populateFromBuffer(DataInputStream buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - classId = buffer.getUnsignedShort(); - weight = buffer.getUnsignedShort(); - bodySize = buffer.getLong(); - int propertyFlags = buffer.getUnsignedShort(); + classId = buffer.readUnsignedShort(); + weight = buffer.readUnsignedShort(); + bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); } @@ -90,8 +93,8 @@ public class ContentHeaderBody implements AMQBody * @return * @throws AMQFrameDecodingException */ - public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { ContentHeaderBody body = new ContentHeaderBody(buffer, size); @@ -103,11 +106,11 @@ public class ContentHeaderBody implements AMQBody return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, classId); EncodingUtils.writeUnsignedShort(buffer, weight); - buffer.putLong(bodySize); + buffer.writeLong(bodySize); EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); properties.writePropertyListPayload(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index 8d5e2f9fb4..a474e337b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 7ef538cfdc..237929f9a3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + /** * There will be an implementation of this interface for each content type. All content types have associated @@ -32,7 +35,7 @@ public interface ContentHeaderProperties * Writes the property list to the buffer, in a suitably encoded form. * @param buffer The buffer to write to */ - void writePropertyListPayload(ByteBuffer buffer); + void writePropertyListPayload(DataOutputStream buffer) throws IOException; /** * Populates the properties from buffer. @@ -40,8 +43,8 @@ public interface ContentHeaderProperties * @param propertyFlags he property flags. * @throws AMQFrameDecodingException when the buffer does not contain valid data */ - void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException; + void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) + throws AMQFrameDecodingException, IOException; /** * @return the size of the encoded property list in bytes. @@ -56,5 +59,4 @@ public interface ContentHeaderProperties */ int getPropertyFlags(); - void updated(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 46189b63d7..43ee8cd1f1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; @@ -38,8 +39,8 @@ public class ContentHeaderPropertiesFactory } public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, - ByteBuffer buffer, int size) - throws AMQFrameDecodingException + DataInputStream buffer, int size) + throws AMQFrameDecodingException, IOException { ContentHeaderProperties properties; // AMQP version change: "Hardwired" version to major=8, minor=0 diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java deleted file mode 100644 index f6795ff200..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public abstract class DeferredDataBlock extends AMQDataBlock -{ - private AMQDataBlock _underlyingDataBlock; - - - public long getSize() - { - if(_underlyingDataBlock == null) - { - _underlyingDataBlock = createAMQDataBlock(); - } - return _underlyingDataBlock.getSize(); - } - - public void writePayload(ByteBuffer buffer) - { - if(_underlyingDataBlock == null) - { - _underlyingDataBlock = createAMQDataBlock(); - } - _underlyingDataBlock.writePayload(buffer); - } - - abstract protected AMQDataBlock createAMQDataBlock(); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 6425f8c591..2d7e27405c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.nio.charset.Charset; public class EncodingUtils @@ -218,7 +219,7 @@ public class EncodingUtils return 0; } - public static void writeShortStringBytes(ByteBuffer buffer, String s) + public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException { if (s != null) { @@ -231,18 +232,18 @@ public class EncodingUtils // TODO: check length fits in an unsigned byte writeUnsignedByte(buffer, (short)encodedString.length); - buffer.put(encodedString); + buffer.write(encodedString); } else { // really writing out unsigned byte - buffer.put((byte) 0); + buffer.write((byte) 0); } } - public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s) + public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException { if (s != null) { @@ -252,11 +253,11 @@ public class EncodingUtils else { // really writing out unsigned byte - buffer.put((byte) 0); + buffer.write((byte) 0); } } - public static void writeLongStringBytes(ByteBuffer buffer, String s) + public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException { assert (s == null) || (s.length() <= 0xFFFE); if (s != null) @@ -270,7 +271,7 @@ public class EncodingUtils encodedString[i] = (byte) cha[i]; } - buffer.put(encodedString); + buffer.write(encodedString); } else { @@ -278,7 +279,7 @@ public class EncodingUtils } } - public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException { assert (s == null) || (s.length <= 0xFFFE); if (s != null) @@ -291,7 +292,7 @@ public class EncodingUtils encodedString[i] = (byte) s[i]; } - buffer.put(encodedString); + buffer.write(encodedString); } else { @@ -299,13 +300,13 @@ public class EncodingUtils } } - public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException { assert (bytes == null) || (bytes.length <= 0xFFFE); if (bytes != null) { writeUnsignedInteger(buffer, bytes.length); - buffer.put(bytes); + buffer.write(bytes); } else { @@ -313,24 +314,24 @@ public class EncodingUtils } } - public static void writeUnsignedByte(ByteBuffer buffer, short b) + public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException { byte bv = (byte) b; - buffer.put(bv); + buffer.write(bv); } - public static void writeUnsignedShort(ByteBuffer buffer, int s) + public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (s < Short.MAX_VALUE) { - buffer.putShort((short) s); + buffer.writeShort(s); } else { short sv = (short) s; - buffer.put((byte) (0xFF & (sv >> 8))); - buffer.put((byte) (0xFF & sv)); + buffer.write((byte) (0xFF & (sv >> 8))); + buffer.write((byte) (0xFF & sv)); } } @@ -339,12 +340,12 @@ public class EncodingUtils return 4; } - public static void writeUnsignedInteger(ByteBuffer buffer, long l) + public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (l < Integer.MAX_VALUE) { - buffer.putInt((int) l); + buffer.writeInt((int) l); } else { @@ -352,14 +353,14 @@ public class EncodingUtils // FIXME: This *may* go faster if we build this into a local 4-byte array and then // put the array in a single call. - buffer.put((byte) (0xFF & (iv >> 24))); - buffer.put((byte) (0xFF & (iv >> 16))); - buffer.put((byte) (0xFF & (iv >> 8))); - buffer.put((byte) (0xFF & iv)); + buffer.write((byte) (0xFF & (iv >> 24))); + buffer.write((byte) (0xFF & (iv >> 16))); + buffer.write((byte) (0xFF & (iv >> 8))); + buffer.write((byte) (0xFF & iv)); } } - public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException { if (table != null) { @@ -371,12 +372,12 @@ public class EncodingUtils } } - public static void writeContentBytes(ByteBuffer buffer, Content content) + public static void writeContentBytes(DataOutputStream buffer, Content content) { // TODO: New Content class required for AMQP 0-9. } - public static void writeBooleans(ByteBuffer buffer, boolean[] values) + public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException { byte packedValue = 0; for (int i = 0; i < values.length; i++) @@ -387,16 +388,16 @@ public class EncodingUtils } } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value) + public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException { - buffer.put(value ? (byte) 1 : (byte) 0); + buffer.write(value ? (byte) 1 : (byte) 0); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -405,10 +406,10 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 1)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -422,10 +423,10 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 2)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -444,11 +445,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 3)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -472,11 +473,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 4)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -505,11 +506,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 5)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5, boolean value6) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -543,11 +544,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 6)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5, boolean value6, boolean value7) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6, boolean value7) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -586,7 +587,7 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 7)); } - buffer.put(packedValue); + buffer.write(packedValue); } /** @@ -595,12 +596,12 @@ public class EncodingUtils * @param buffer * @param data */ - public static void writeLongstr(ByteBuffer buffer, byte[] data) + public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException { if (data != null) { writeUnsignedInteger(buffer, data.length); - buffer.put(data); + buffer.write(data); } else { @@ -608,14 +609,14 @@ public class EncodingUtils } } - public static void writeTimestamp(ByteBuffer buffer, long timestamp) + public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException { writeLong(buffer, timestamp); } - public static boolean[] readBooleans(ByteBuffer buffer) + public static boolean[] readBooleans(DataInputStream buffer) throws IOException { - final byte packedValue = buffer.get(); + final byte packedValue = buffer.readByte(); if (packedValue == 0) { return ALL_FALSE_ARRAY; @@ -640,9 +641,9 @@ public class EncodingUtils return result; } - public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -653,21 +654,21 @@ public class EncodingUtils } } - public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException + public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException { // TODO: New Content class required for AMQP 0-9. return null; } - public static AMQShortString readAMQShortString(ByteBuffer buffer) + public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException { return AMQShortString.readFromBuffer(buffer); } - public static String readShortString(ByteBuffer buffer) + public static String readShortString(DataInputStream buffer) throws IOException { - short length = buffer.getUnsigned(); + short length = (short) (((short)buffer.readByte()) & 0xFF); if (length == 0) { return null; @@ -680,7 +681,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[length]; - buffer.get(stringBytes, 0, length); + buffer.read(stringBytes, 0, length); char[] stringChars = new char[length]; for (int i = 0; i < stringChars.length; i++) { @@ -691,9 +692,9 @@ public class EncodingUtils } } - public static String readLongString(ByteBuffer buffer) + public static String readLongString(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return ""; @@ -706,7 +707,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[(int) length]; - buffer.get(stringBytes, 0, (int) length); + buffer.read(stringBytes, 0, (int) length); char[] stringChars = new char[(int) length]; for (int i = 0; i < stringChars.length; i++) { @@ -717,9 +718,9 @@ public class EncodingUtils } } - public static byte[] readLongstr(ByteBuffer buffer) + public static byte[] readLongstr(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -727,17 +728,17 @@ public class EncodingUtils else { byte[] result = new byte[(int) length]; - buffer.get(result); + buffer.read(result); return result; } } - public static long readTimestamp(ByteBuffer buffer) + public static long readTimestamp(DataInputStream buffer) throws IOException { // Discard msb from AMQ timestamp // buffer.getUnsignedInt(); - return buffer.getLong(); + return buffer.readLong(); } static byte[] hexToByteArray(String id) @@ -817,14 +818,14 @@ public class EncodingUtils // AMQP_BOOLEAN_PROPERTY_PREFIX - public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean) + public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException { - buffer.put((byte) (aBoolean ? 1 : 0)); + buffer.write(aBoolean ? 1 : 0); } - public static boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(DataInputStream buffer) throws IOException { - byte packedValue = buffer.get(); + byte packedValue = buffer.readByte(); return (packedValue == 1); } @@ -835,14 +836,14 @@ public class EncodingUtils } // AMQP_BYTE_PROPERTY_PREFIX - public static void writeByte(ByteBuffer buffer, Byte aByte) + public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException { - buffer.put(aByte); + buffer.writeByte(aByte); } - public static byte readByte(ByteBuffer buffer) + public static byte readByte(DataInputStream buffer) throws IOException { - return buffer.get(); + return buffer.readByte(); } public static int encodedByteLength() @@ -851,14 +852,14 @@ public class EncodingUtils } // AMQP_SHORT_PROPERTY_PREFIX - public static void writeShort(ByteBuffer buffer, Short aShort) + public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException { - buffer.putShort(aShort); + buffer.writeShort(aShort); } - public static short readShort(ByteBuffer buffer) + public static short readShort(DataInputStream buffer) throws IOException { - return buffer.getShort(); + return buffer.readShort(); } public static int encodedShortLength() @@ -867,14 +868,14 @@ public class EncodingUtils } // INTEGER_PROPERTY_PREFIX - public static void writeInteger(ByteBuffer buffer, Integer aInteger) + public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException { - buffer.putInt(aInteger); + buffer.writeInt(aInteger); } - public static int readInteger(ByteBuffer buffer) + public static int readInteger(DataInputStream buffer) throws IOException { - return buffer.getInt(); + return buffer.readInt(); } public static int encodedIntegerLength() @@ -883,14 +884,14 @@ public class EncodingUtils } // AMQP_LONG_PROPERTY_PREFIX - public static void writeLong(ByteBuffer buffer, Long aLong) + public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException { - buffer.putLong(aLong); + buffer.writeLong(aLong); } - public static long readLong(ByteBuffer buffer) + public static long readLong(DataInputStream buffer) throws IOException { - return buffer.getLong(); + return buffer.readLong(); } public static int encodedLongLength() @@ -899,14 +900,14 @@ public class EncodingUtils } // Float_PROPERTY_PREFIX - public static void writeFloat(ByteBuffer buffer, Float aFloat) + public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException { - buffer.putFloat(aFloat); + buffer.writeFloat(aFloat); } - public static float readFloat(ByteBuffer buffer) + public static float readFloat(DataInputStream buffer) throws IOException { - return buffer.getFloat(); + return buffer.readFloat(); } public static int encodedFloatLength() @@ -915,14 +916,14 @@ public class EncodingUtils } // Double_PROPERTY_PREFIX - public static void writeDouble(ByteBuffer buffer, Double aDouble) + public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException { - buffer.putDouble(aDouble); + buffer.writeDouble(aDouble); } - public static double readDouble(ByteBuffer buffer) + public static double readDouble(DataInputStream buffer) throws IOException { - return buffer.getDouble(); + return buffer.readDouble(); } public static int encodedDoubleLength() @@ -930,9 +931,9 @@ public class EncodingUtils return 8; } - public static byte[] readBytes(ByteBuffer buffer) + public static byte[] readBytes(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -940,19 +941,19 @@ public class EncodingUtils else { byte[] dataBytes = new byte[(int)length]; - buffer.get(dataBytes, 0, (int)length); + buffer.read(dataBytes, 0, (int) length); return dataBytes; } } - public static void writeBytes(ByteBuffer buffer, byte[] data) + public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException { if (data != null) { // TODO: check length fits in an unsigned byte writeUnsignedInteger(buffer, (long)data.length); - buffer.put(data); + buffer.write(data); } else { @@ -968,35 +969,35 @@ public class EncodingUtils return encodedByteLength(); } - public static char readChar(ByteBuffer buffer) + public static char readChar(DataInputStream buffer) throws IOException { // This is valid as we know that the Character is ASCII 0..127 - return (char) buffer.get(); + return (char) buffer.read(); } - public static void writeChar(ByteBuffer buffer, char character) + public static void writeChar(DataOutputStream buffer, char character) throws IOException { // This is valid as we know that the Character is ASCII 0..127 writeByte(buffer, (byte) character); } - public static long readLongAsShortString(ByteBuffer buffer) + public static long readLongAsShortString(DataInputStream buffer) throws IOException { - short length = buffer.getUnsigned(); + short length = (short) buffer.readUnsignedByte(); short pos = 0; if (length == 0) { return 0L; } - byte digit = buffer.get(); + byte digit = buffer.readByte(); boolean isNegative; long result = 0; if (digit == (byte) '-') { isNegative = true; pos++; - digit = buffer.get(); + digit = buffer.readByte(); } else { @@ -1009,7 +1010,7 @@ public class EncodingUtils while (pos < length) { pos++; - digit = buffer.get(); + digit = buffer.readByte(); result = (result << 3) + (result << 1); result += digit - (byte) '0'; } @@ -1017,15 +1018,15 @@ public class EncodingUtils return result; } - public static long readUnsignedInteger(ByteBuffer buffer) + public static long readUnsignedInteger(DataInputStream buffer) throws IOException { - long l = 0xFF & buffer.get(); + long l = 0xFF & buffer.readByte(); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); return l; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 22205d49f8..721c821bab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; @@ -43,8 +47,8 @@ public class FieldTable private static final String STRICT_AMQP = "STRICT_AMQP"; private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false")); - private ByteBuffer _encodedForm; - private LinkedHashMap<AMQShortString, AMQTypedValue> _properties; + private byte[] _encodedForm; + private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; private static final int INITIAL_ENCODED_FORM_SIZE = 256; @@ -52,9 +56,6 @@ public class FieldTable public FieldTable() { super(); - // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE); - // _encodedForm.setAutoExpand(true); - // _encodedForm.limit(0); } /** @@ -63,16 +64,12 @@ public class FieldTable * @param buffer the buffer from which to read data. The length byte must be read already * @param length the length of the field table. Must be > 0. */ - public FieldTable(ByteBuffer buffer, long length) + public FieldTable(DataInputStream buffer, long length) throws IOException { this(); - ByteBuffer encodedForm = buffer.slice(); - encodedForm.limit((int) length); - _encodedForm = ByteBuffer.allocate((int)length); - _encodedForm.put(encodedForm); - _encodedForm.flip(); + _encodedForm = new byte[(int) length]; + buffer.read(_encodedForm); _encodedSize = length; - buffer.skip((int) length); } public AMQTypedValue getProperty(AMQShortString string) @@ -108,13 +105,19 @@ public class FieldTable { try { - setFromBuffer(_encodedForm, _encodedSize); + setFromBuffer(); } catch (AMQFrameDecodingException e) { _logger.error("Error decoding FieldTable in deferred decoding mode ", e); throw new IllegalArgumentException(e); } + catch (IOException e) + { + _logger.error("Unexpected IO exception decoding field table"); + throw new IllegalArgumentException(e); + + } } private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) @@ -766,7 +769,7 @@ public class FieldTable // ************************* Byte Buffer Processing - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { final boolean trace = _logger.isDebugEnabled(); @@ -786,17 +789,21 @@ public class FieldTable public byte[] getDataAsBytes() { - final int encodedSize = (int) getEncodedSize(); - final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem? - - putDataInBuffer(buffer); - - final byte[] result = new byte[encodedSize]; - buffer.flip(); - buffer.get(result); - buffer.release(); + if(_encodedForm == null) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try + { + putDataInBuffer(new DataOutputStream(baos)); + return baos.toByteArray(); + } + catch (IOException e) + { + throw new IllegalArgumentException("IO Exception should never be thrown here"); + } - return result; + } + return _encodedForm.clone(); } public long getEncodedSize() @@ -926,15 +933,8 @@ public class FieldTable public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator() { - if(_encodedForm != null) - { - return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); - } - else - { - initMapIfNecessary(); - return _properties.entrySet().iterator(); - } + initMapIfNecessary(); + return _properties.entrySet().iterator(); } public Object get(String key) @@ -1002,26 +1002,12 @@ public class FieldTable return _properties.keySet(); } - private void putDataInBuffer(ByteBuffer buffer) + private void putDataInBuffer(DataOutputStream buffer) throws IOException { if (_encodedForm != null) { - if(buffer.isDirect() || buffer.isReadOnly()) - { - ByteBuffer encodedForm = _encodedForm.duplicate(); - - if (encodedForm.position() != 0) - { - encodedForm.flip(); - } - - buffer.put(encodedForm); - } - else - { - buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize); - } + buffer.write(_encodedForm); } else if (_properties != null) { @@ -1035,41 +1021,27 @@ public class FieldTable final Map.Entry<AMQShortString, AMQTypedValue> me = it.next(); try { - if (_logger.isDebugEnabled()) - { - _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" - + me.getValue().getValue()); - _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); - } - // Write the actual parameter name EncodingUtils.writeShortStringBytes(buffer, me.getKey()); me.getValue().writeToBuffer(buffer); } catch (Exception e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Exception thrown:" + e); - _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" - + me.getValue().getValue()); - _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); - } - throw new RuntimeException(e); } } } } - private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + private void setFromBuffer() throws AMQFrameDecodingException, IOException { + final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm); + DataInputStream buffer = new DataInputStream(in); final boolean trace = _logger.isDebugEnabled(); - if (length > 0) + if (_encodedSize > 0) { - final int expectedRemaining = buffer.remaining() - (int) length; _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY); @@ -1077,121 +1049,16 @@ public class FieldTable { final AMQShortString key = EncodingUtils.readAMQShortString(buffer); - - _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key); - AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); - - if (trace) - { - _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() - + "', key '" + key + "', value '" + value.getValue() + "'"); - } - _properties.put(key, value); } - while (buffer.remaining() > expectedRemaining); - - } - - _encodedSize = length; - - if (trace) - { - _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); - } - } - - private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue> - { - private final AMQTypedValue _value; - private final AMQShortString _key; - - public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) - { - _key = key; - _value = value; - } - - public AMQShortString getKey() - { - return _key; - } - - public AMQTypedValue getValue() - { - return _value; - } - - public AMQTypedValue setValue(final AMQTypedValue value) - { - throw new UnsupportedOperationException(); - } - - public boolean equals(Object o) - { - if(o instanceof FieldTableEntry) - { - FieldTableEntry other = (FieldTableEntry) o; - return (_key == null ? other._key == null : _key.equals(other._key)) - && (_value == null ? other._value == null : _value.equals(other._value)); - } - else - { - return false; - } - } - - public int hashCode() - { - return (getKey()==null ? 0 : getKey().hashCode()) - ^ (getValue()==null ? 0 : getValue().hashCode()); - } - - } - - - private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>> - { + while (in.available() > 0); - private final ByteBuffer _buffer; - private int _expectedRemaining; - - public FieldTableIterator(ByteBuffer buffer, int length) - { - _buffer = buffer; - _expectedRemaining = buffer.remaining() - length; - } - - public boolean hasNext() - { - return (_buffer.remaining() > _expectedRemaining); } - public Map.Entry<AMQShortString, AMQTypedValue> next() - { - if(hasNext()) - { - final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); - AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); - return new FieldTableEntry(key, value); - } - else - { - return null; - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } } - - - public int hashCode() { initMapIfNecessary(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index e9d75137ef..438a46f28b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; public class FieldTableFactory { @@ -29,7 +30,7 @@ public class FieldTableFactory return new FieldTable(); } - public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException + public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException { return new FieldTable(byteBuffer, length); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 18ab05ffa1..a6ce721a50 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -34,12 +37,12 @@ public class HeartbeatBody implements AMQBody } - public HeartbeatBody(ByteBuffer buffer, long size) + public HeartbeatBody(DataInputStream buffer, long size) throws IOException { if(size > 0) { //allow other implementations to have a payload, but ignore it: - buffer.skip((int) size); + buffer.skip(size); } } @@ -53,7 +56,7 @@ public class HeartbeatBody implements AMQBody return 0;//heartbeats we generate have no payload } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) { } @@ -63,12 +66,12 @@ public class HeartbeatBody implements AMQBody session.heartbeatBodyReceived(channelId, this); } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { if(size > 0) { //allow other implementations to have a payload, but ignore it: - buffer.skip((int) size); + buffer.skip(size); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index c7ada708dc..dfc49c6167 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index fb3dd89717..8c018316f0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -62,35 +66,30 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) + public ProtocolInitiation(DataInputStream in) throws IOException { _protocolHeader = new byte[4]; - in.get(_protocolHeader); + in.read(_protocolHeader); - _protocolClass = in.get(); - _protocolInstance = in.get(); - _protocolMajor = in.get(); - _protocolMinor = in.get(); + _protocolClass = in.readByte(); + _protocolInstance = in.readByte(); + _protocolMajor = in.readByte(); + _protocolMinor = in.readByte(); } - public void writePayload(org.apache.mina.common.ByteBuffer buffer) - { - writePayload(buffer.buf()); - } - public long getSize() { return 4 + 1 + 1 + 1 + 1; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - buffer.put(_protocolHeader); - buffer.put(_protocolClass); - buffer.put(_protocolInstance); - buffer.put(_protocolMajor); - buffer.put(_protocolMinor); + buffer.write(_protocolHeader); + buffer.write(_protocolClass); + buffer.write(_protocolInstance); + buffer.write(_protocolMajor); + buffer.write(_protocolMinor); } public boolean equals(Object o) @@ -144,9 +143,9 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(ByteBuffer in) + public boolean decodable(DataInputStream in) throws IOException { - return (in.remaining() >= 8); + return (in.available() >= 8); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java index bd763599b0..d2925d13a8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -21,7 +21,8 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -68,7 +69,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl return frameSize; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if (_firstFrame != null) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 76c154581d..ed9136f7c9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -144,7 +145,7 @@ public class VersionSpecificRegistry } - public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException + public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException { AMQMethodBodyInstanceFactory bodyFactory; try diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java index 0695349f76..470b7b05e3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -21,12 +21,10 @@ package org.apache.qpid.framing.abstraction; -import org.apache.mina.common.ByteBuffer; - public interface ContentChunk { int getSize(); - ByteBuffer getData(); + byte[] getData(); void reduceToFit(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index 7544d9b7e7..d1e53d6907 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -23,8 +23,6 @@ package org.apache.qpid.framing.abstraction; import org.apache.qpid.framing.AMQBody; -import java.nio.ByteBuffer; - public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter { AMQBody convertToBody(ContentChunk contentBody); @@ -32,5 +30,5 @@ public interface ProtocolVersionMethodConverter extends MessagePublishInfoConver void configure(); - AMQBody convertToBody(ByteBuffer buf); + AMQBody convertToBody(byte[] input); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index 1c4a29b106..90a730d6f7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -21,16 +21,13 @@ package org.apache.qpid.framing.amqp_0_9; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.*; -import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl; + public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -72,9 +69,9 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) @@ -116,9 +113,9 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return _contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return _contentBodyChunk.payload; + return _contentBodyChunk._payload; } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java index 6e330574bc..3b0cc3cebc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -21,8 +21,6 @@ package org.apache.qpid.framing.amqp_0_91; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -70,9 +68,9 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) @@ -114,9 +112,9 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro return _contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return _contentBodyChunk.payload; + return _contentBodyChunk._payload; } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index c87820b9b2..e6d0482f0d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -26,11 +26,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; - public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { private int _basicPublishClassId; @@ -60,9 +57,9 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot return contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return contentBodyChunk.payload; + return contentBodyChunk._payload; } public void reduceToFit() @@ -81,9 +78,9 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java deleted file mode 100644 index 2c7652abeb..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.qpid.transport; - -import org.apache.mina.common.IoConnector; - -public interface SocketConnectorFactory -{ - IoConnector newConnector(); -}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 7f0f04f9c4..e1d1596ec5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -37,13 +37,6 @@ import org.apache.qpid.transport.util.Logger; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index 62e25e7d79..272eb75800 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.codec; */ +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -46,9 +49,16 @@ public class AMQDecoderTest extends TestCase } - public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + private ByteBuffer getHeartbeatBodyBuffer() throws IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + HeartbeatBody.FRAME.writePayload(new DataOutputStream(baos)); + return ByteBuffer.wrap(baos.toByteArray()); + } + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException + { + ByteBuffer msg = getHeartbeatBodyBuffer(); ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); if (frames.get(0) instanceof AMQFrame) { @@ -60,9 +70,9 @@ public class AMQDecoderTest extends TestCase } } - public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = getHeartbeatBodyBuffer(); ByteBuffer msgA = msg.slice(); int msgbPos = msg.remaining() / 2; int msgaLimit = msg.remaining() - msgbPos; @@ -83,10 +93,10 @@ public class AMQDecoderTest extends TestCase } } - public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); msg.put(msgA); msg.put(msgB); @@ -106,11 +116,11 @@ public class AMQDecoderTest extends TestCase } } - public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); + ByteBuffer msgC = getHeartbeatBodyBuffer(); ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); sliceA.put(msgA); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index 4fd1f60d69..5e7783f492 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import junit.framework.TestCase; +import java.io.*; + public class BasicContentHeaderPropertiesTest extends TestCase { @@ -76,15 +76,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase assertEquals(99, _testProperties.getPropertyFlags()); } - public void testWritePropertyListPayload() + public void testWritePropertyListPayload() throws IOException { - ByteBuffer buf = ByteBuffer.allocate(300); - _testProperties.writePropertyListPayload(buf); + _testProperties.writePropertyListPayload(new DataOutputStream(new ByteArrayOutputStream(300))); } public void testPopulatePropertiesFromBuffer() throws Exception { - ByteBuffer buf = ByteBuffer.allocate(300); + DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300])); _testProperties.populatePropertiesFromBuffer(buf, 99, 99); } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index d4691ba097..bb4c9c3884 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -23,14 +23,14 @@ package org.apache.qpid.framing; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQPInvalidClassException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.*; + public class PropertyFieldTableTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class); @@ -441,7 +441,7 @@ public class PropertyFieldTableTest extends TestCase } /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */ - public void testNestedFieldTable() + public void testNestedFieldTable() throws IOException { byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 }; @@ -465,14 +465,16 @@ public class PropertyFieldTableTest extends TestCase outerTable.setFieldTable("innerTable", innerTable); // Write the outer table into the buffer. - final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4); - outerTable.writeToBuffer(buffer); - buffer.flip(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + outerTable.writeToBuffer(new DataOutputStream(baos)); + + byte[] data = baos.toByteArray(); // Extract the table back from the buffer again. try { - FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer); + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data))); FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); @@ -567,7 +569,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table.getObject("object-string")); } - public void testwriteBuffer() + public void testwriteBuffer() throws IOException { byte[] bytes = { 99, 98, 97, 96, 95 }; @@ -585,15 +587,17 @@ public class PropertyFieldTableTest extends TestCase table.setString("string", "hello"); table.setString("null-string", null); - final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem? - table.writeToBuffer(buffer); + ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4); + table.writeToBuffer(new DataOutputStream(baos)); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); - buffer.flip(); - long length = buffer.getUnsignedInt(); + long length = dis.readInt() & 0xFFFFFFFFL; - FieldTable table2 = new FieldTable(buffer, length); + FieldTable table2 = new FieldTable(dis, length); Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index ac3380e0c0..bde8bc68ad 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -24,17 +24,16 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.*; import junit.framework.TestCase; import junit.framework.TestResult; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.mina.util.AvailablePortFinder; + public class QpidTestCase extends TestCase { @@ -140,9 +139,85 @@ public class QpidTestCase extends TestCase return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } + + public static final int MIN_PORT_NUMBER = 1; + public static final int MAX_PORT_NUMBER = 49151; + + + /** + * Gets the next available port starting at a port. + * + * @param fromPort the port to scan for availability + * @throws NoSuchElementException if there are no ports available + */ + protected int getNextAvailable(int fromPort) + { + if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + fromPort); + } + + for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) + { + if (available(i)) { + return i; + } + } + + throw new NoSuchElementException("Could not find an available port above " + fromPort); + } + + /** + * Checks to see if a specific port is available. + * + * @param port the port to check for availability + */ + private boolean available(int port) + { + if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + port); + } + + ServerSocket ss = null; + DatagramSocket ds = null; + try + { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + return true; + } + catch (IOException e) + { + } + finally + { + if (ds != null) + { + ds.close(); + } + + if (ss != null) + { + try + { + ss.close(); + } + catch (IOException e) + { + /* should not be thrown */ + } + } + } + + return false; + } + public int findFreePort() { - return AvailablePortFinder.getNextAvailable(10000); + return getNextAvailable(10000); } /** diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 375a326654..3cd7dea2b6 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.transport; -import org.apache.mina.util.AvailablePortFinder; - import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoAcceptor; @@ -58,7 +56,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { super.setUp(); - port = AvailablePortFinder.getNextAvailable(12000); + port = findFreePort(); } protected void tearDown() throws Exception diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index 0de1308281..215c6d9931 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -22,16 +22,8 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; import java.nio.ByteBuffer; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; /** @@ -46,13 +38,6 @@ import org.apache.qpid.transport.util.Logger; public final class IoTransport<E> { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } private static final Logger log = Logger.get(IoTransport.class); diff --git a/qpid/java/common/templates/method/version/MethodBodyClass.vm b/qpid/java/common/templates/method/version/MethodBodyClass.vm index a739110d70..ce8a453eeb 100644 --- a/qpid/java/common/templates/method/version/MethodBodyClass.vm +++ b/qpid/java/common/templates/method/version/MethodBodyClass.vm @@ -46,9 +46,11 @@ package org.apache.qpid.framing.amqp_$version.getMajor()_$version.getMinor(); +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.HashMap; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.*; import org.apache.qpid.AMQException; @@ -56,7 +58,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version { private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory() { - public AMQMethodBody newInstance(ByteBuffer in, long size) throws AMQFrameDecodingException + public AMQMethodBody newInstance(DataInputStream in, long size) throws AMQFrameDecodingException, IOException { return new ${javaClassName}(in); } @@ -84,7 +86,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version // Constructor - public ${javaClassName}(ByteBuffer buffer) throws AMQFrameDecodingException + public ${javaClassName}(DataInputStream buffer) throws AMQFrameDecodingException, IOException { #foreach( $field in $method.ConsolidatedFields ) _$field.Name = read$field.getEncodingType()( buffer ); @@ -169,7 +171,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version return size; } - public void writeMethodPayload(ByteBuffer buffer) + public void writeMethodPayload(DataOutputStream buffer) throws IOException { #foreach( $field in $method.ConsolidatedFields ) write$field.getEncodingType()( buffer, _$field.Name ); diff --git a/qpid/java/common/templates/model/MethodRegistryClass.vm b/qpid/java/common/templates/model/MethodRegistryClass.vm index 759e5e4a42..8258175ce7 100644 --- a/qpid/java/common/templates/model/MethodRegistryClass.vm +++ b/qpid/java/common/templates/model/MethodRegistryClass.vm @@ -30,7 +30,8 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -53,8 +54,8 @@ public abstract class MethodRegistry #end - public abstract AMQMethodBody convertToBody(ByteBuffer in, long size) - throws AMQFrameDecodingException; + public abstract AMQMethodBody convertToBody(DataInputStream in, long size) + throws AMQFrameDecodingException, IOException; public abstract int getMaxClassId(); @@ -101,4 +102,4 @@ public abstract class MethodRegistry public abstract ProtocolVersionMethodConverter getProtocolVersionMethodConverter(); -}
\ No newline at end of file +} diff --git a/qpid/java/common/templates/model/version/MethodRegistryClass.vm b/qpid/java/common/templates/model/version/MethodRegistryClass.vm index 277605e34b..79553f7748 100644 --- a/qpid/java/common/templates/model/version/MethodRegistryClass.vm +++ b/qpid/java/common/templates/model/version/MethodRegistryClass.vm @@ -35,32 +35,33 @@ import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodRegistry_$version.getMajor()_$version.getMinor() extends MethodRegistry { - + private static final Logger _log = LoggerFactory.getLogger(MethodRegistry.class); - private ProtocolVersionMethodConverter _protocolVersionConverter = new MethodConverter_$version.getMajor()_$version.getMinor()(); - -#set( $specificModel = $model.asSingleVersionModel() ) - - -#set( $maxClassId = $specificModel.getMaximumClassId()+1 ) - private final AMQMethodBodyInstanceFactory[][] _factories = new AMQMethodBodyInstanceFactory[$maxClassId][]; - - public MethodRegistry_$version.getMajor()_$version.getMinor()() - { - this(new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor())); + private ProtocolVersionMethodConverter _protocolVersionConverter = new MethodConverter_$version.getMajor()_$version.getMinor()(); + +#set( $specificModel = $model.asSingleVersionModel() ) + + +#set( $maxClassId = $specificModel.getMaximumClassId()+1 ) + private final AMQMethodBodyInstanceFactory[][] _factories = new AMQMethodBodyInstanceFactory[$maxClassId][]; + + public MethodRegistry_$version.getMajor()_$version.getMinor()() + { + this(new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor())); } - - public MethodRegistry_$version.getMajor()_$version.getMinor()(ProtocolVersion pv) - { - super(pv); + + public MethodRegistry_$version.getMajor()_$version.getMinor()(ProtocolVersion pv) + { + super(pv); #foreach( $amqpClass in $specificModel.getClassList() ) #set( $amqpClassNameFirstChar = $amqpClass.getName().substring(0,1) ) #set( $amqpClassNameFirstCharU = $amqpClassNameFirstChar.toUpperCase() ) @@ -68,30 +69,30 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth - // Register method body instance factories for the $amqpClassNameUpperCamel class. + // Register method body instance factories for the $amqpClassNameUpperCamel class. -#set( $maxMethodId = $amqpClass.getMaximumMethodId()+1 ) +#set( $maxMethodId = $amqpClass.getMaximumMethodId()+1 ) _factories[$amqpClass.getClassId()] = new AMQMethodBodyInstanceFactory[$maxMethodId]; - + #foreach( $amqpMethod in $amqpClass.getMethodList() ) #set( $amqpMethodNameFirstChar = $amqpMethod.getName().substring(0,1) ) #set( $amqpMethodNameFirstCharU = $amqpMethodNameFirstChar.toUpperCase() ) #set( $amqpMethodNameUpperCamel = "$amqpMethodNameFirstCharU$amqpMethod.getName().substring(1)" ) _factories[$amqpClass.getClassId()][$amqpMethod.getMethodId()] = ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl.getFactory(); -#end - +#end + #end - - - } + + + } - public AMQMethodBody convertToBody(ByteBuffer in, long size) - throws AMQFrameDecodingException + public AMQMethodBody convertToBody(DataInputStream in, long size) + throws AMQFrameDecodingException, IOException { - int classId = in.getUnsignedShort(); - int methodId = in.getUnsignedShort(); - + int classId = in.readUnsignedShort(); + int methodId = in.readUnsignedShort(); + AMQMethodBodyInstanceFactory bodyFactory; try { @@ -137,15 +138,15 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth public int getMaxClassId() - { - return $specificModel.getMaximumClassId(); - } + { + return $specificModel.getMaximumClassId(); + } public int getMaxMethodId(int classId) - { - return _factories[classId].length - 1; - } - + { + return _factories[classId].length - 1; + } + #foreach( $amqpClass in $specificModel.getClassList() ) @@ -153,12 +154,12 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth #set( $amqpClassNameFirstCharU = $amqpClassNameFirstChar.toUpperCase() ) #set( $amqpClassNameUpperCamel = "$amqpClassNameFirstCharU$amqpClass.getName().substring(1)" ) - + #foreach( $amqpMethod in $amqpClass.getMethodList() ) #set( $amqpMethodNameFirstChar = $amqpMethod.getName().substring(0,1) ) #set( $amqpMethodNameFirstCharU = $amqpMethodNameFirstChar.toUpperCase() ) #set( $amqpMethodNameUpperCamel = "$amqpMethodNameFirstCharU$amqpMethod.getName().substring(1)" ) - public ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body create${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body( + public ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body create${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body( #foreach( $field in $amqpMethod.FieldList ) #if( $velocityCount == $amqpMethod.getFieldList().size() ) final $field.NativeType $field.Name @@ -166,9 +167,9 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth final $field.NativeType $field.Name, #end #end - ) + ) { - return new ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl( + return new ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl( #foreach( $field in $amqpMethod.FieldList ) #if( $velocityCount == $amqpMethod.getFieldList().size() ) $field.Name @@ -176,18 +177,18 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth $field.Name, #end #end - ); + ); } -#end - +#end + #end - - + + public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() { return _protocolVersionConverter; - } + } } diff --git a/qpid/java/lib/mina-core-1.1.7.jar b/qpid/java/lib/mina-core-1.1.7.jar Binary files differdeleted file mode 100644 index a5fc451cc7..0000000000 --- a/qpid/java/lib/mina-core-1.1.7.jar +++ /dev/null diff --git a/qpid/java/lib/mina-filter-ssl-1.1.7.jar b/qpid/java/lib/mina-filter-ssl-1.1.7.jar Binary files differdeleted file mode 100644 index 40bdc919e1..0000000000 --- a/qpid/java/lib/mina-filter-ssl-1.1.7.jar +++ /dev/null diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index 3ad6c021bd..d94ca5592b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.SimpleByteBufferAllocator; - import javax.jms.JMSException; import javax.jms.Session; import javax.jms.ObjectMessage; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java index 19657ef396..2864d8e994 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java @@ -68,61 +68,6 @@ public class ManagementActorLoggingTest extends AbstractTestLogging /** * Description: - * When a JMX Management connection is made then this will be logged out. - * - * Input: - * - * 1. Running Broker - * 2. Connect Management client via JMX - * Output: - * - * <date> MNG-1007 : Open <user> - * - * Validation Steps: - * 1. The MNG ID is correct - * 2. The user is correct - * - * On connection close a MNG-1008 is expected - * - * * <date> MNG-1008 : Close - * - * Validation Steps: - * 1. The MNG ID is correct - * - * @throws java.io.IOException - if there is a problem reseting the log monitor - */ - public void testJMXManagementConsoleConnection() throws IOException - { - List<String> results = waitAndFindMatches("MNG-1007"); - - assertEquals("Unexpected Management Connection count", 1, results.size()); - - String log = getLogMessage(results, 0); - - validateMessageID("MNG-1007", log); - - assertTrue("User not in log message:" + log, log.endsWith(USER)); - // Extract the id from the log string - // MESSAGE [mng:1(rmi://169.24.29.116)] MNG-1007 : Open : User admin - int connectionID = Integer.parseInt(fromActor(getLog(results.get(0))).charAt(4) + ""); - - results = findMatches("MNG-1008"); - - assertEquals("Unexpected Management Connection close count", 0, results.size()); - - _jmxUtils.close(); - _closed = true; - - results = waitAndFindMatches("MNG-1008"); - - assertEquals("Unexpected Management Connection count", 1, results.size()); - - assertEquals("Close does not have same id as open,", connectionID, - Integer.parseInt(fromActor(getLog(results.get(0))).charAt(4) + "")); - } - - /** - * Description: * When a connected client has its connection closed via the Management Console this will be logged as a CON-1002 message. * Input: * diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java index 4c2758241e..fd33266414 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java @@ -20,13 +20,10 @@ */ package org.apache.qpid.server.failover; -import org.apache.mina.common.WriteTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.FailoverBaseCase; -import org.apache.qpid.AMQConnectionClosedException; import javax.jms.Destination; import javax.jms.Message; @@ -34,6 +31,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -195,7 +193,7 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem // Send IO Exception - causing failover _connection.getProtocolHandler(). - exception(new WriteTimeoutException("WriteTimeoutException to cause failover.")); + exception(new IOException("IOException to cause failover.")); // Verify Failover occured through ConnectionListener assertTrue("Failover did not occur", diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 59ce64eb4f..0731d56204 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.test.unit.basic; import junit.framework.Assert; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; @@ -45,6 +44,7 @@ import javax.jms.MessageNotWriteableException; import javax.jms.MessageProducer; import javax.jms.Session; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index abf8da799c..3af215d1d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.test.unit.basic; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -41,6 +39,8 @@ import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; @@ -105,7 +105,7 @@ public class FieldTableMessageTest extends QpidBrokerTestCase implements Message { int count = _count; _waitForCompletion = new CountDownLatch(_count); - send(count); + send(count); _waitForCompletion.await(20, TimeUnit.SECONDS); check(); _logger.info("Completed without failure"); @@ -125,12 +125,15 @@ public class FieldTableMessageTest extends QpidBrokerTestCase implements Message } - void check() throws JMSException, AMQFrameDecodingException + void check() throws JMSException, AMQFrameDecodingException, IOException { for (Object m : received) { - ByteBuffer buffer = ((JMSBytesMessage) m).getData(); - FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining()); + final BytesMessage bytesMessage = (BytesMessage) m; + final long bodyLength = bytesMessage.getBodyLength(); + byte[] data = new byte[(int) bodyLength]; + bytesMessage.readBytes(data); + FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength); for (String key : _expected.keys()) { assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key)); diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 29f585b300..2b58a0684d 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -20,6 +20,10 @@ // QPID-3391: the C++ broker does not currently validate the exchange creation arguments org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithNonsenseArgs +//This test requires SSL, but SSL is only enabled for the C++ broker in the cpp.ssl test profile +//which runs *all* the tests with SSL, so this one can be excluded safely enough +org.apache.qpid.test.unit.client.AMQSSLConnectionTest#* + org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* org.apache.qpid.client.ResetMessageListenerTest#* diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes new file mode 100644 index 0000000000..31d2a8affc --- /dev/null +++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes @@ -0,0 +1,22 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +// QPID-3477: Java broker does not handle rejection code specified in test +qpid.tests.messaging.endpoints.SessionTests.testReject + |