summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-12-13 15:39:24 +0000
committerSteven Shaw <steshaw@apache.org>2006-12-13 15:39:24 +0000
commit2b390e65df9e2f76566942dff4ddeffd8a839590 (patch)
tree9c753202d163c0e4358e2a6895c1dffbc2ab98eb
parentcc3d9dfed5f5e3f97aa96270bec743cb60f409f6 (diff)
downloadqpid-python-2b390e65df9e2f76566942dff4ddeffd8a839590.tar.gz
QPID-173. Re-port JmsByteBytes (QpidBytesMessage) from Java. Notably missing was the _data.rewind() in the Text getter. Also removed/tidied up some commented out code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486688 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs2
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs1
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs19
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs442
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs34
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs41
7 files changed, 105 insertions, 436 deletions
diff --git a/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs b/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs
index 852c7f3aa8..b11d6b6b14 100644
--- a/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs
+++ b/dotnet/Qpid.Buffer/SimpleByteBufferAllocator.cs
@@ -116,8 +116,6 @@ namespace Qpid.Buffer
protected override void capacity0(int requestedCapacity)
{
- Console.WriteLine("XXX capacity0 called with requestedCapacity=" + requestedCapacity); // FIXME: remove this.
-
int newCapacity = MINIMUM_CAPACITY;
while( newCapacity < requestedCapacity )
{
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 6e70eb54e0..067db452b6 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -805,6 +805,7 @@ namespace Qpid.Client
//
// Very nasty temporary hack for GRM. Will be altered ASAP.
+ // FIXME: Remove this hack.
//
if (message is QpidBytesMessage)
{
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
index ad1bc55d87..13a6fa8113 100644
--- a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -27,27 +27,8 @@ namespace Qpid.Client.Message
{
public abstract class AbstractQmsMessageFactory : IMessageFactory
{
- //public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies)
- //{
- // AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
- // msg.Redelivered = redelivered;
- // return msg;
- //}
-
public abstract AbstractQmsMessage CreateMessage();
- ///// <summary>
- /////
- ///// </summary>
- ///// <param name="messageNbr"></param>
- ///// <param name="contentHeader"></param>
- ///// <param name="bodies"></param>
- ///// <returns></returns>
- ///// <exception cref="AMQException"></exception>
- //protected abstract AbstractQmsMessage CreateMessageWithBody(long messageNbr,
- // ContentHeaderBody contentHeader,
- // IList bodies);
-
private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory));
protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader);
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index d6ffcd665c..035a009006 100644
--- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -37,7 +37,7 @@ namespace Qpid.Client.Message
protected bool _redelivered;
protected ByteBuffer _data;
- private bool _readableMessage = false;
+ protected bool _readableMessage = false;
//protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
//{
diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
index 251e7ed57f..d5574e892f 100644
--- a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -27,23 +27,19 @@ using Qpid.Buffer;
namespace Qpid.Client.Message
{
+ class MessageEOFException : QpidException
+ {
+ public MessageEOFException(string message) : base(message)
+ {
+ }
+ }
+
public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
{
private const string MIME_TYPE = "application/octet-stream";
private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
- /// <summary>
- /// The backingstore for the data
- /// </summary>
- private MemoryStream _dataStream; // FIXME: Probably don't need this any more.
-
- private int _bodyLength;
-
- private BinaryReader _reader;
-
- private BinaryWriter _writer;
-
public QpidBytesMessage() : this(null)
{
}
@@ -60,17 +56,7 @@ namespace Qpid.Client.Message
if (data == null)
{
_data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
- //_data.AutoExpand = true;
- _dataStream = new MemoryStream();
- _writer = new BinaryWriter(_dataStream);
- }
- else
- {
- byte[] bytes = new byte[data.remaining()];
- data.get(bytes);
- _dataStream = new MemoryStream(bytes);
- _bodyLength = bytes.Length;
- _reader = new BinaryReader(_dataStream);
+ _data.setAutoExpand(true);
}
}
@@ -78,13 +64,7 @@ namespace Qpid.Client.Message
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
: base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
{
- ContentHeaderProperties.ContentType = MIME_TYPE;
- byte[] bytes = new byte[data.remaining()];
- data.get(bytes);
- _dataStream = new MemoryStream(bytes);
- _bodyLength = bytes.Length;
- _reader = new BinaryReader(_dataStream);
-
+ ContentHeaderProperties.ContentType = MIME_TYPE;
}
public override void ClearBodyImpl()
@@ -92,19 +72,6 @@ namespace Qpid.Client.Message
_data.clear();
}
-// public override void ClearBody()
-// {
-// if (_reader != null)
-// {
-// _reader.Close();
-// _reader = null;
-// }
-// _dataStream = new MemoryStream();
-// _bodyLength = 0;
-//
-// _writer = new BinaryWriter(_dataStream);
-// }
-
public override string ToBodyString()
{
CheckReadable();
@@ -117,45 +84,31 @@ namespace Qpid.Client.Message
throw new QpidException(e.ToString());
}
}
-
- private string GetText()
- {
- if (_dataStream != null)
+
+ private String GetText()
+ {
+ // this will use the default platform encoding
+ if (_data == null)
{
- // we cannot just read the underlying buffer since it may be larger than the amount of
- // "filled" data. Length is not the same as Capacity.
- byte[] data = new byte[_dataStream.Length];
- _dataStream.Read(data, 0, (int)_dataStream.Length);
- return Encoding.UTF8.GetString(data);
+ return null;
}
- else
+ int pos = _data.position();
+ _data.rewind();
+ // one byte left is for the end of frame marker
+ if (_data.remaining() == 0)
{
+ // this is really redundant since pos must be zero
+ _data.position(pos);
return null;
}
+ else
+ {
+ byte[] data = new byte[_data.remaining()];
+ _data.get(data);
+ return Encoding.UTF8.GetString(data);
+ }
}
- //public override byte[] Data
- //{
- // get
- // {
- // if (_dataStream == null)
- // {
- // return null;
- // }
- // else
- // {
- // byte[] data = new byte[_dataStream.Length];
- // _dataStream.Position = 0;
- // _dataStream.Read(data, 0, (int) _dataStream.Length);
- // return data;
- // }
- // }
- // set
- // {
- // throw new NotSupportedException("Cannot set data payload except during construction");
- // }
- //}
-
public override string MimeType
{
get
@@ -169,27 +122,13 @@ namespace Qpid.Client.Message
get
{
CheckReadable();
- return _data.limit(); // XXX
-// return _bodyLength;
+ return _data.limit();
}
}
- /// <summary>
- ///
- /// </summary>
- /// <exception cref="MessageNotReadableException">if the message is in write mode</exception>
-// private void CheckReadable()
-// {
-//
-// if (_reader == null)
-// {
-// throw new MessageNotReadableException("You need to call reset() to make the message readable");
-// }
-// }
-
private void CheckWritable()
{
- if (_reader != null)
+ if (_readableMessage)
{
throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
@@ -198,126 +137,76 @@ namespace Qpid.Client.Message
public bool ReadBoolean()
{
CheckReadable();
- try
- {
- return _reader.ReadBoolean();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(1);
+ return _data.get() != 0;
}
public byte ReadByte()
{
CheckReadable();
- try
- {
- return _reader.ReadByte();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(1);
+ return _data.get();
}
public short ReadSignedByte()
{
CheckReadable();
- try
- {
- return _reader.ReadSByte();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(1);
+ return _data.get();
}
public short ReadShort()
{
CheckReadable();
- try
- {
- return _reader.ReadInt16();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(2);
+ return _data.getShort();
}
public char ReadChar()
{
CheckReadable();
- try
- {
- return _reader.ReadChar();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(2);
+ return _data.getChar();
}
public int ReadInt()
{
CheckReadable();
- try
- {
- return _reader.ReadInt32();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(4);
+ return _data.getInt();
}
public long ReadLong()
{
CheckReadable();
- try
- {
- return _reader.ReadInt64();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(8);
+ return _data.getLong();
}
public float ReadFloat()
{
CheckReadable();
- try
- {
- return _reader.ReadSingle();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(4);
+ return _data.getFloat();
}
public double ReadDouble()
{
CheckReadable();
- try
- {
- return _reader.ReadDouble();
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ CheckAvailable(8);
+ return _data.getDouble();
}
public string ReadUTF()
{
CheckReadable();
+ // we check only for one byte since theoretically the string could be only a
+ // single byte when using UTF-8 encoding
+ CheckAvailable(1);
try
{
- byte[] data = _reader.ReadBytes((int)_dataStream.Length);
+ byte[] data = new byte[_data.remaining()];
+ _data.get(data);
return Encoding.UTF8.GetString(data);
}
catch (IOException e)
@@ -333,269 +222,132 @@ namespace Qpid.Client.Message
throw new ArgumentNullException("bytes");
}
CheckReadable();
- try
- {
- return _reader.Read(bytes, 0, bytes.Length);
+ int count = (_data.remaining() >= bytes.Length ? bytes.Length : _data.remaining());
+ if (count == 0)
+ {
+ return -1;
}
- catch (IOException e)
+ else
{
- throw new QpidException(e.ToString(), e);
+ _data.get(bytes, 0, count);
+ return count;
}
}
- public int ReadBytes(byte[] bytes, int count)
+ public int ReadBytes(byte[] bytes, int maxLength)
{
- CheckReadable();
if (bytes == null)
{
throw new ArgumentNullException("bytes");
}
- if (count < 0)
+ if (maxLength > bytes.Length)
{
- throw new ArgumentOutOfRangeException("count must be >= 0");
+ throw new ArgumentOutOfRangeException("maxLength must be >= 0");
}
- if (count > bytes.Length)
- {
- count = bytes.Length;
- }
-
- try
+ CheckReadable();
+ int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+ if (count == 0)
{
- return _reader.Read(bytes, 0, count);
+ return -1;
}
- catch (IOException e)
+ else
{
- throw new QpidException(e.ToString(), e);
+ _data.get(bytes, 0, count);
+ return count;
}
}
public void WriteBoolean(bool b)
{
CheckWritable();
- try
- {
- _writer.Write(b);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.put(b ? (byte)1 : (byte)0);
}
public void WriteByte(byte b)
{
CheckWritable();
- try
- {
- _writer.Write(b);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.put(b);
}
public void WriteShort(short i)
{
CheckWritable();
- try
- {
- _writer.Write(i);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.putShort(i);
}
public void WriteChar(char c)
{
CheckWritable();
- try
- {
- _writer.Write(c);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.putChar(c);
}
public void WriteSignedByte(short value)
{
CheckWritable();
- try
- {
- _writer.Write(value);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.put((byte)value);
}
public void WriteDouble(double value)
{
CheckWritable();
- try
- {
- _writer.Write(value);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.putDouble(value);
}
public void WriteFloat(float value)
{
CheckWritable();
- try
- {
- _writer.Write(value);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.putFloat(value);
}
public void WriteInt(int value)
{
CheckWritable();
- try
- {
- _writer.Write(value);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.putInt(value);
}
public void WriteLong(long value)
{
CheckWritable();
- try
- {
- _writer.Write(value);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
- }
-
- public void Write(int i)
- {
- CheckWritable();
- try
- {
- _writer.Write(i);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
- }
-
- public void Write(long l)
- {
- CheckWritable();
- try
- {
- _writer.Write(l);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
- }
-
- public void Write(float v)
- {
- CheckWritable();
- try
- {
- _writer.Write(v);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
- }
-
- public void Write(double v)
- {
- CheckWritable();
- try
- {
- _writer.Write(v);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
- }
+ _data.putLong(value);
+ }
public void WriteUTF(string value)
{
CheckWritable();
- try
- {
- byte[] encodedData = Encoding.UTF8.GetBytes(value);
- _writer.Write(encodedData);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ byte[] encodedData = Encoding.UTF8.GetBytes(value);
+ _data.put(encodedData);
}
public void WriteBytes(byte[] bytes)
{
CheckWritable();
- try
- {
- _writer.Write(bytes);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.put(bytes);
}
public void WriteBytes(byte[] bytes, int offset, int length)
{
CheckWritable();
- try
- {
- _writer.Write(bytes, offset, length);
- }
- catch (IOException e)
- {
- throw new QpidException(e.ToString(), e);
- }
+ _data.put(bytes, offset, length);
}
public void Reset()
{
base.Reset();
_data.flip();
-
-// CheckWritable();
-// try
-// {
-// _writer.Close();
-// _writer = null;
-// _reader = new BinaryReader(_dataStream);
-// _bodyLength = (int) _dataStream.Length;
-// }
-// catch (IOException e)
-// {
-// throw new QpidException(e.ToString(), e);
-// }
}
+
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws MessageEOFException if there are less than len bytes available to read
+ */
+ private void CheckAvailable(int len)
+ {
+ if (_data.remaining() < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
}
}
-
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
index 650186a90b..d56f2c0857 100644
--- a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -30,35 +30,7 @@ namespace Qpid.Client.Message
{
private const string MIME_TYPE = "text/plain";
- private string _decodedValue;
-
- //public QpidTextMessage() : this(null, null)
- //{
- //}
-
- //public QpidTextMessage(byte[] data, String encoding) : base()
- //{
- // // the superclass has instantied a content header at this point
- // ContentHeaderProperties.ContentType= MIME_TYPE;
- // _data = data;
- // ContentHeaderProperties.Encoding = encoding;
- //}
-
- //public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader)
- // : base(messageNbr, contentHeader)
- //{
- // contentHeader.ContentType = MIME_TYPE;
- // _data = data;
- //}
-
- //public QpidTextMessage(byte[] data) : this(data, null)
- //{
- //}
-
- //public QpidTextMessage(string text)
- //{
- // Text = text;
- //}
+ private string _decodedValue = null;
internal QpidTextMessage() : this(null, null)
{
@@ -74,7 +46,7 @@ namespace Qpid.Client.Message
:base(deliveryTag, contentHeader, data)
{
contentHeader.ContentType = MIME_TYPE;
- _data = data;
+ _data = data; // FIXME: Unnecessary - done in base class ctor.
}
QpidTextMessage(ByteBuffer data) : this(data, null)
@@ -123,6 +95,8 @@ namespace Qpid.Client.Message
}
else
{
+ _data.rewind();
+
// Read remaining bytes.
byte[] bytes = new byte[_data.remaining()];
_data.get(bytes);
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
index 54ce8d023c..cc4f6dafe1 100644
--- a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -18,48 +18,13 @@
* under the License.
*
*/
-using System;
-using System.Collections;
-using Qpid.Framing;
using Qpid.Buffer;
+using Qpid.Framing;
namespace Qpid.Client.Message
{
public class QpidTextMessageFactory : AbstractQmsMessageFactory
- {
-
- // protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- // IList bodies)
- // {
- // byte[] data;
-
- // // we optimise the non-fragmented case to avoid copying
- // if (bodies != null && bodies.Count == 1)
- // {
- // data = ((ContentBody)bodies[0]).Payload;
- // }
- // else
- // {
- // data = new byte[(int)contentHeader.BodySize];
- // int currentPosition = 0;
- // foreach (ContentBody cb in bodies)
- // {
- // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
- // currentPosition += cb.Payload.Length;
- // }
- // }
-
- // return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties);
- // }
-
-
- // public override AbstractQmsMessage CreateMessage()
- // {
- // return new QpidTextMessage();
- // }
-
-
-
+ {
public override AbstractQmsMessage CreateMessage()
{
return new QpidTextMessage();
@@ -69,7 +34,5 @@ namespace Qpid.Client.Message
{
return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data);
}
-
}
}
-