diff options
author | Steven Shaw <steshaw@apache.org> | 2006-12-13 15:39:24 +0000 |
---|---|---|
committer | Steven Shaw <steshaw@apache.org> | 2006-12-13 15:39:24 +0000 |
commit | 2b390e65df9e2f76566942dff4ddeffd8a839590 (patch) | |
tree | 9c753202d163c0e4358e2a6895c1dffbc2ab98eb | |
parent | cc3d9dfed5f5e3f97aa96270bec743cb60f409f6 (diff) | |
download | qpid-python-2b390e65df9e2f76566942dff4ddeffd8a839590.tar.gz |
QPID-173. Re-port JmsByteBytes (QpidBytesMessage) from Java. Notably missing was the _data.rewind() in the Text getter. Also removed/tidied up some commented out code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486688 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 105 insertions, 436 deletions
diff --git a/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs b/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs index 852c7f3aa8..b11d6b6b14 100644 --- a/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs +++ b/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs @@ -116,8 +116,6 @@ namespace Qpid.Buffer protected override void capacity0(int requestedCapacity) { - Console.WriteLine("XXX capacity0 called with requestedCapacity=" + requestedCapacity); // FIXME: remove this. - int newCapacity = MINIMUM_CAPACITY; while( newCapacity < requestedCapacity ) { diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 6e70eb54e0..067db452b6 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -805,6 +805,7 @@ namespace Qpid.Client // // Very nasty temporary hack for GRM. Will be altered ASAP. + // FIXME: Remove this hack. // if (message is QpidBytesMessage) { diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs index ad1bc55d87..13a6fa8113 100644 --- a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs +++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -27,27 +27,8 @@ namespace Qpid.Client.Message { public abstract class AbstractQmsMessageFactory : IMessageFactory { - //public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) - //{ - // AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); - // msg.Redelivered = redelivered; - // return msg; - //} - public abstract AbstractQmsMessage CreateMessage(); - ///// <summary> - ///// - ///// </summary> - ///// <param name="messageNbr"></param> - ///// <param name="contentHeader"></param> - ///// <param name="bodies"></param> - ///// <returns></returns> - ///// <exception cref="AMQException"></exception> - //protected abstract AbstractQmsMessage CreateMessageWithBody(long messageNbr, - // ContentHeaderBody contentHeader, - // IList bodies); - private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory)); protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader); diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs index d6ffcd665c..035a009006 100644 --- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -37,7 +37,7 @@ namespace Qpid.Client.Message protected bool _redelivered; protected ByteBuffer _data; - private bool _readableMessage = false; + protected bool _readableMessage = false; //protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) //{ diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs index 251e7ed57f..d5574e892f 100644 --- a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -27,23 +27,19 @@ using Qpid.Buffer; namespace Qpid.Client.Message { + class MessageEOFException : QpidException + { + public MessageEOFException(string message) : base(message) + { + } + } + public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage { private const string MIME_TYPE = "application/octet-stream"; private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - /// <summary> - /// The backingstore for the data - /// </summary> - private MemoryStream _dataStream; // FIXME: Probably don't need this any more. - - private int _bodyLength; - - private BinaryReader _reader; - - private BinaryWriter _writer; - public QpidBytesMessage() : this(null) { } @@ -60,17 +56,7 @@ namespace Qpid.Client.Message if (data == null) { _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - //_data.AutoExpand = true; - _dataStream = new MemoryStream(); - _writer = new BinaryWriter(_dataStream); - } - else - { - byte[] bytes = new byte[data.remaining()]; - data.get(bytes); - _dataStream = new MemoryStream(bytes); - _bodyLength = bytes.Length; - _reader = new BinaryReader(_dataStream); + _data.setAutoExpand(true); } } @@ -78,13 +64,7 @@ namespace Qpid.Client.Message // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data) { - ContentHeaderProperties.ContentType = MIME_TYPE; - byte[] bytes = new byte[data.remaining()]; - data.get(bytes); - _dataStream = new MemoryStream(bytes); - _bodyLength = bytes.Length; - _reader = new BinaryReader(_dataStream); - + ContentHeaderProperties.ContentType = MIME_TYPE; } public override void ClearBodyImpl() @@ -92,19 +72,6 @@ namespace Qpid.Client.Message _data.clear(); } -// public override void ClearBody() -// { -// if (_reader != null) -// { -// _reader.Close(); -// _reader = null; -// } -// _dataStream = new MemoryStream(); -// _bodyLength = 0; -// -// _writer = new BinaryWriter(_dataStream); -// } - public override string ToBodyString() { CheckReadable(); @@ -117,45 +84,31 @@ namespace Qpid.Client.Message throw new QpidException(e.ToString()); } } - - private string GetText() - { - if (_dataStream != null) + + private String GetText() + { + // this will use the default platform encoding + if (_data == null) { - // we cannot just read the underlying buffer since it may be larger than the amount of - // "filled" data. Length is not the same as Capacity. - byte[] data = new byte[_dataStream.Length]; - _dataStream.Read(data, 0, (int)_dataStream.Length); - return Encoding.UTF8.GetString(data); + return null; } - else + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) { + // this is really redundant since pos must be zero + _data.position(pos); return null; } + else + { + byte[] data = new byte[_data.remaining()]; + _data.get(data); + return Encoding.UTF8.GetString(data); + } } - //public override byte[] Data - //{ - // get - // { - // if (_dataStream == null) - // { - // return null; - // } - // else - // { - // byte[] data = new byte[_dataStream.Length]; - // _dataStream.Position = 0; - // _dataStream.Read(data, 0, (int) _dataStream.Length); - // return data; - // } - // } - // set - // { - // throw new NotSupportedException("Cannot set data payload except during construction"); - // } - //} - public override string MimeType { get @@ -169,27 +122,13 @@ namespace Qpid.Client.Message get { CheckReadable(); - return _data.limit(); // XXX -// return _bodyLength; + return _data.limit(); } } - /// <summary> - /// - /// </summary> - /// <exception cref="MessageNotReadableException">if the message is in write mode</exception> -// private void CheckReadable() -// { -// -// if (_reader == null) -// { -// throw new MessageNotReadableException("You need to call reset() to make the message readable"); -// } -// } - private void CheckWritable() { - if (_reader != null) + if (_readableMessage) { throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } @@ -198,126 +137,76 @@ namespace Qpid.Client.Message public bool ReadBoolean() { CheckReadable(); - try - { - return _reader.ReadBoolean(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(1); + return _data.get() != 0; } public byte ReadByte() { CheckReadable(); - try - { - return _reader.ReadByte(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(1); + return _data.get(); } public short ReadSignedByte() { CheckReadable(); - try - { - return _reader.ReadSByte(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(1); + return _data.get(); } public short ReadShort() { CheckReadable(); - try - { - return _reader.ReadInt16(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(2); + return _data.getShort(); } public char ReadChar() { CheckReadable(); - try - { - return _reader.ReadChar(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(2); + return _data.getChar(); } public int ReadInt() { CheckReadable(); - try - { - return _reader.ReadInt32(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(4); + return _data.getInt(); } public long ReadLong() { CheckReadable(); - try - { - return _reader.ReadInt64(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(8); + return _data.getLong(); } public float ReadFloat() { CheckReadable(); - try - { - return _reader.ReadSingle(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(4); + return _data.getFloat(); } public double ReadDouble() { CheckReadable(); - try - { - return _reader.ReadDouble(); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + CheckAvailable(8); + return _data.getDouble(); } public string ReadUTF() { CheckReadable(); + // we check only for one byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + CheckAvailable(1); try { - byte[] data = _reader.ReadBytes((int)_dataStream.Length); + byte[] data = new byte[_data.remaining()]; + _data.get(data); return Encoding.UTF8.GetString(data); } catch (IOException e) @@ -333,269 +222,132 @@ namespace Qpid.Client.Message throw new ArgumentNullException("bytes"); } CheckReadable(); - try - { - return _reader.Read(bytes, 0, bytes.Length); + int count = (_data.remaining() >= bytes.Length ? bytes.Length : _data.remaining()); + if (count == 0) + { + return -1; } - catch (IOException e) + else { - throw new QpidException(e.ToString(), e); + _data.get(bytes, 0, count); + return count; } } - public int ReadBytes(byte[] bytes, int count) + public int ReadBytes(byte[] bytes, int maxLength) { - CheckReadable(); if (bytes == null) { throw new ArgumentNullException("bytes"); } - if (count < 0) + if (maxLength > bytes.Length) { - throw new ArgumentOutOfRangeException("count must be >= 0"); + throw new ArgumentOutOfRangeException("maxLength must be >= 0"); } - if (count > bytes.Length) - { - count = bytes.Length; - } - - try + CheckReadable(); + int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); + if (count == 0) { - return _reader.Read(bytes, 0, count); + return -1; } - catch (IOException e) + else { - throw new QpidException(e.ToString(), e); + _data.get(bytes, 0, count); + return count; } } public void WriteBoolean(bool b) { CheckWritable(); - try - { - _writer.Write(b); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.put(b ? (byte)1 : (byte)0); } public void WriteByte(byte b) { CheckWritable(); - try - { - _writer.Write(b); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.put(b); } public void WriteShort(short i) { CheckWritable(); - try - { - _writer.Write(i); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.putShort(i); } public void WriteChar(char c) { CheckWritable(); - try - { - _writer.Write(c); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.putChar(c); } public void WriteSignedByte(short value) { CheckWritable(); - try - { - _writer.Write(value); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.put((byte)value); } public void WriteDouble(double value) { CheckWritable(); - try - { - _writer.Write(value); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.putDouble(value); } public void WriteFloat(float value) { CheckWritable(); - try - { - _writer.Write(value); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.putFloat(value); } public void WriteInt(int value) { CheckWritable(); - try - { - _writer.Write(value); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.putInt(value); } public void WriteLong(long value) { CheckWritable(); - try - { - _writer.Write(value); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } - } - - public void Write(int i) - { - CheckWritable(); - try - { - _writer.Write(i); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } - } - - public void Write(long l) - { - CheckWritable(); - try - { - _writer.Write(l); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } - } - - public void Write(float v) - { - CheckWritable(); - try - { - _writer.Write(v); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } - } - - public void Write(double v) - { - CheckWritable(); - try - { - _writer.Write(v); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } - } + _data.putLong(value); + } public void WriteUTF(string value) { CheckWritable(); - try - { - byte[] encodedData = Encoding.UTF8.GetBytes(value); - _writer.Write(encodedData); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + byte[] encodedData = Encoding.UTF8.GetBytes(value); + _data.put(encodedData); } public void WriteBytes(byte[] bytes) { CheckWritable(); - try - { - _writer.Write(bytes); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.put(bytes); } public void WriteBytes(byte[] bytes, int offset, int length) { CheckWritable(); - try - { - _writer.Write(bytes, offset, length); - } - catch (IOException e) - { - throw new QpidException(e.ToString(), e); - } + _data.put(bytes, offset, length); } public void Reset() { base.Reset(); _data.flip(); - -// CheckWritable(); -// try -// { -// _writer.Close(); -// _writer = null; -// _reader = new BinaryReader(_dataStream); -// _bodyLength = (int) _dataStream.Length; -// } -// catch (IOException e) -// { -// throw new QpidException(e.ToString(), e); -// } } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws MessageEOFException if there are less than len bytes available to read + */ + private void CheckAvailable(int len) + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } } } - diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs index 650186a90b..d56f2c0857 100644 --- a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -30,35 +30,7 @@ namespace Qpid.Client.Message { private const string MIME_TYPE = "text/plain"; - private string _decodedValue; - - //public QpidTextMessage() : this(null, null) - //{ - //} - - //public QpidTextMessage(byte[] data, String encoding) : base() - //{ - // // the superclass has instantied a content header at this point - // ContentHeaderProperties.ContentType= MIME_TYPE; - // _data = data; - // ContentHeaderProperties.Encoding = encoding; - //} - - //public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader) - // : base(messageNbr, contentHeader) - //{ - // contentHeader.ContentType = MIME_TYPE; - // _data = data; - //} - - //public QpidTextMessage(byte[] data) : this(data, null) - //{ - //} - - //public QpidTextMessage(string text) - //{ - // Text = text; - //} + private string _decodedValue = null; internal QpidTextMessage() : this(null, null) { @@ -74,7 +46,7 @@ namespace Qpid.Client.Message :base(deliveryTag, contentHeader, data) { contentHeader.ContentType = MIME_TYPE; - _data = data; + _data = data; // FIXME: Unnecessary - done in base class ctor. } QpidTextMessage(ByteBuffer data) : this(data, null) @@ -123,6 +95,8 @@ namespace Qpid.Client.Message } else { + _data.rewind(); + // Read remaining bytes. byte[] bytes = new byte[_data.remaining()]; _data.get(bytes); diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs index 54ce8d023c..cc4f6dafe1 100644 --- a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -18,48 +18,13 @@ * under the License. * */ -using System; -using System.Collections; -using Qpid.Framing; using Qpid.Buffer; +using Qpid.Framing; namespace Qpid.Client.Message { public class QpidTextMessageFactory : AbstractQmsMessageFactory - { - - // protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, - // IList bodies) - // { - // byte[] data; - - // // we optimise the non-fragmented case to avoid copying - // if (bodies != null && bodies.Count == 1) - // { - // data = ((ContentBody)bodies[0]).Payload; - // } - // else - // { - // data = new byte[(int)contentHeader.BodySize]; - // int currentPosition = 0; - // foreach (ContentBody cb in bodies) - // { - // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); - // currentPosition += cb.Payload.Length; - // } - // } - - // return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties); - // } - - - // public override AbstractQmsMessage CreateMessage() - // { - // return new QpidTextMessage(); - // } - - - + { public override AbstractQmsMessage CreateMessage() { return new QpidTextMessage(); @@ -69,7 +34,5 @@ namespace Qpid.Client.Message { return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data); } - } } - |