From 5924742a77953f3de8776d9b45e2396c605f1aa2 Mon Sep 17 00:00:00 2001 From: Steven Shaw Date: Wed, 29 Nov 2006 05:51:43 +0000 Subject: QPID-137. First stab at porting enough to get AutoAcknowledge mode working. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480423 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs | 5 + .../Qpid.Client.Tests/failover/FailoverTxTest.cs | 65 ++++++-- qpid/dotnet/Qpid.Client/Client/AMQConnection.cs | 4 +- qpid/dotnet/Qpid.Client/Client/AmqChannel.cs | 26 +--- .../Qpid.Client/Client/BasicMessageConsumer.cs | 170 +++++++++++++++------ .../Qpid.Client/Client/Message/AMQMessage.cs | 3 +- .../Client/Message/AMQMessageFactory.cs | 73 +++++++-- .../Client/Message/AbstractQmsMessage.cs | 128 +++++++++------- .../Qpid.Client/Client/Message/IMessageFactory.cs | 2 +- .../Client/Message/MessageFactoryRegistry.cs | 2 +- .../Qpid.Client/Client/Message/QpidBytesMessage.cs | 60 ++++---- .../Client/Message/QpidBytesMessageFactory.cs | 56 ++++--- .../Qpid.Client/Client/Message/QpidTextMessage.cs | 77 ++++++---- .../Client/Message/QpidTextMessageFactory.cs | 63 +++++--- 14 files changed, 479 insertions(+), 255 deletions(-) (limited to 'qpid/dotnet') diff --git a/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs index 03dbd58bff..c54272b33f 100644 --- a/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs +++ b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs @@ -385,6 +385,11 @@ namespace Qpid.Buffer { return new HeapByteBuffer(bytes, length); } + + public static HeapByteBuffer wrap(byte[] bytes) + { + return new HeapByteBuffer(bytes, bytes.Length); + } } } diff --git a/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index eb08ca9446..79a04e79eb 100644 --- a/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -35,12 +35,12 @@ namespace Qpid.Client.Tests.failover const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; - const int NUM_ROLLEDBACK_MESSAGES = 5; + const int NUM_ROLLEDBACK_MESSAGES = 3; const int SLEEP_MILLIS = 500; AMQConnection _connection; - public void onMessage(IMessage message) + public void OnMessage(IMessage message) { try { @@ -48,7 +48,40 @@ namespace Qpid.Client.Tests.failover } catch (QpidException e) { - error(e); + Error(e); + } + } + + class NoWaitConsumer + { + FailoverTxTest _failoverTxTest; + IMessageConsumer _consumer; + + internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel) + { + _failoverTxTest = failoverTxTest; + _consumer = channel; + } + + internal void Run() + { + int messages = 0; + while (messages < NUM_COMMITED_MESSAGES) + { + IMessage msg = _consumer.ReceiveNoWait(); + if (msg != null) + { + _log.Info("NoWait received message"); + ++messages; + _failoverTxTest.OnMessage(msg); + } + else + { + Thread.Sleep(1); + } + + } + } } @@ -60,7 +93,7 @@ namespace Qpid.Client.Tests.failover _log.Info("connectionInfo = " + connectionInfo); _log.Info("connection.asUrl = " + _connection.toURL()); - IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); string queueName = receivingChannel.GenerateUniqueName(); @@ -70,11 +103,23 @@ namespace Qpid.Client.Tests.failover // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); - receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create(); + bool useThread = true; + if (useThread) + { + NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer); + new Thread(noWaitConsumer.Run).Start(); + } + else + { + //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + } _connection.Start(); - publishInTx(queueName); + PublishInTx(queueName); Thread.Sleep(2000); // Wait a while for last messages. @@ -82,7 +127,7 @@ namespace Qpid.Client.Tests.failover _log.Info("FailoverTxText complete"); } - private void publishInTx(string routingKey) + private void PublishInTx(string routingKey) { _log.Info("sendInTx"); bool transacted = true; @@ -113,13 +158,13 @@ namespace Qpid.Client.Tests.failover } } - private void error(Exception e) + private void Error(Exception e) { _log.Fatal("Exception received. About to stop.", e); - stop(); + Stop(); } - private void stop() + private void Stop() { _log.Info("Stopping..."); try diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs index a19b46c40a..5c0537429e 100644 --- a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -265,11 +265,11 @@ namespace Qpid.Client int _prefetch; AMQConnection _connection; - public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgementMode, int prefetch) + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) { _connection = connection; _transacted = transacted; - _acknowledgeMode = acknowledgementMode; + _acknowledgeMode = acknowledgeMode; _prefetch = prefetch; } diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs index 48d87d8f90..0ab3fd3411 100644 --- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -124,7 +124,7 @@ namespace Qpid.Client } else { - consumer.NotifyMessage(message, _containingChannel.AcknowledgeMode, _containingChannel.ChannelId); + consumer.NotifyMessage(message, _containingChannel.ChannelId); } } else @@ -595,22 +595,6 @@ namespace Qpid.Client } } - /// - /// Send an acknowledgement for all messages up to a specified number on this session. - /// the message number up to an including which all messages will be acknowledged. - /// - public void SendAcknowledgement(ulong messageNbr) - { - /*if (_logger.IsDebugEnabled) - { - _logger.Debug("Channel Ack being sent for channel id " + _channelId + " and message number " + messageNbr); - }*/ - /*Channel.Ack frame = new Channel.Ack(); - frame.channelId = _channelId; - frame.messageNbr = messageNbr; - _connection.getProtocolHandler().writeFrame(frame);*/ - } - internal void Start() { _dispatcher = new Dispatcher(this); @@ -815,7 +799,7 @@ namespace Qpid.Client currentTime = DateTime.UtcNow.Ticks; message.Timestamp = currentTime; } - byte[] payload = message.Data; + byte[] payload = message.Data.ToByteArray(); BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties; if (timeToLive > 0) @@ -986,10 +970,10 @@ namespace Qpid.Client * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag */ - public void AcknowledgeMessage(long deliveryTag, bool multiple) + public void AcknowledgeMessage(ulong deliveryTag, bool multiple) { - // XXX: cast to ulong evil? - AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple); + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); + _logger.Info("XXX sending ack: " + ackFrame); if (_logger.IsDebugEnabled) { _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 1c9a009174..ffd19e9500 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -50,15 +50,18 @@ namespace Qpid.Client set { _noLocal = value; } } - AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge; - public AcknowledgeMode AcknowledgeMode { - get { return _acknowledgeMode; } + get { return _channel.AcknowledgeMode; } } private MessageReceivedDelegate _messageListener; + private bool IsMessageListenerSet + { + get { return _messageListener != null; } + } + /// /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the /// broker @@ -173,12 +176,7 @@ namespace Qpid.Client o = _synchronousQueue.DequeueBlocking(); } - IMessage m = ReturnMessageOrThrow(o); - if (m != null) - { - PostDeliver(m); - } - return m; + return ReturnMessageOrThrowAndPostDeliver(o); } finally { @@ -189,6 +187,16 @@ namespace Qpid.Client } } + private IMessage ReturnMessageOrThrowAndPostDeliver(object o) + { + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; + } + public IMessage Receive() { return Receive(0); @@ -211,8 +219,14 @@ namespace Qpid.Client try { - object o = _synchronousQueue.Dequeue(); - return ReturnMessageOrThrow(o); + if (_synchronousQueue.Count > 0) + { + return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue()); + } + else + { + return null; + } } finally { @@ -285,14 +299,73 @@ namespace Qpid.Client } } - /// - /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case - /// of a message listener or a synchronous receive() caller. - /// - /// the raw unprocessed mesage - /// the acknowledge mode requested for this message - /// channel on which this message was sent - internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) +// /// +// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case +// /// of a message listener or a synchronous receive() caller. +// /// +// /// the raw unprocessed mesage +// /// the acknowledge mode requested for this message +// /// channel on which this message was sent +// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) +// { +// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); +// if (_logger.IsDebugEnabled) +// { +// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); +// } +// try +// { +// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, +// messageFrame.DeliverBody.Redelivered, +// messageFrame.ContentHeader, +// messageFrame.Bodies); + +// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) +// { +// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); +// }*/ +// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) +// { +// // we set the session so that when the user calls acknowledge() it can call the method on session +// // to send out the appropriate frame +// jmsMessage.Channel = _channel; +// } + +// lock (_syncLock) +// { +// if (_messageListener != null) +// { +//#if __MonoCS__ +// _messageListener(jmsMessage); +//#else +// _messageListener.Invoke(jmsMessage); +//#endif +// } +// else +// { +// _synchronousQueue.Enqueue(jmsMessage); +// } +// } +// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) +// { +// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); +// } +// } +// catch (Exception e) +// { +// _logger.Error("Caught exception (dump follows) - ignoring...", e); +// } +// } + + + /** + * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case + * of a message listener or a synchronous receive() caller. + * + * @param messageFrame the raw unprocessed mesage + * @param channelId channel on which this message was sent + */ + internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId) { if (_logger.IsDebugEnabled) { @@ -300,48 +373,38 @@ namespace Qpid.Client } try { - AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag, + AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, messageFrame.DeliverBody.Redelivered, messageFrame.ContentHeader, messageFrame.Bodies); - /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) - { - _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); - }*/ - if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) - { - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - jmsMessage.Channel = _channel; - } + _logger.Debug("Message is of type: " + jmsMessage.GetType().Name); - lock (_syncLock) + PreDeliver(jmsMessage); + + if (IsMessageListenerSet) { - if (_messageListener != null) - { + // We do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started. #if __MonoCS__ _messageListener(jmsMessage); #else - _messageListener.Invoke(jmsMessage); + _messageListener.Invoke(jmsMessage); #endif - } - else - { - _synchronousQueue.Enqueue(jmsMessage); - } + PostDeliver(jmsMessage); } - if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) + else { - _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); + _synchronousQueue.Enqueue(jmsMessage); } } catch (Exception e) { - _logger.Error("Caught exception (dump follows) - ignoring...", e); + _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME } } + internal void NotifyError(Exception cause) { lock (_syncLock) @@ -416,15 +479,32 @@ namespace Qpid.Client { if (_lastDeliveryTag > 0) { - _channel.AcknowledgeMessage(_lastDeliveryTag, true); + _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast _lastDeliveryTag = -1; } } + private void PreDeliver(AbstractQmsMessage msg) + { + switch (AcknowledgeMode) + { + case AcknowledgeMode.PreAcknowledge: + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false); + break; + + case AcknowledgeMode.ClientAcknowledge: + // We set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame. + //msg.setAMQSession(_session); + msg.Channel = _channel; + break; + } + } + private void PostDeliver(IMessage m) { AbstractQmsMessage msg = (AbstractQmsMessage) m; - switch (_acknowledgeMode) + switch (AcknowledgeMode) { /* TODO case AcknowledgeMode.DupsOkAcknowledge: @@ -444,7 +524,7 @@ namespace Qpid.Client break; */ case AcknowledgeMode.AutoAcknowledge: - _channel.AcknowledgeMessage(msg.DeliveryTag, false); + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; case AcknowledgeMode.SessionTransacted: _lastDeliveryTag = msg.DeliveryTag; diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs index 81499e31fe..a43eb028df 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -39,7 +39,8 @@ namespace Qpid.Client.Message _deliveryTag = deliveryTag; } - public AMQMessage(IContentHeaderProperties properties) : this(properties, -1) + public AMQMessage(IContentHeaderProperties properties) + : this(properties, -1) { } diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs index e485bb6d34..dd9855d675 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -20,30 +20,73 @@ */ using System.Collections; using Qpid.Framing; +using log4net; +using Qpid.Buffer; namespace Qpid.Client.Message { public abstract class AbstractQmsMessageFactory : IMessageFactory { - public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) + //public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) + //{ + // AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); + // msg.Redelivered = redelivered; + // return msg; + //} + + public abstract AbstractQmsMessage CreateMessage(); + + ///// + ///// + ///// + ///// + ///// + ///// + ///// + ///// + //protected abstract AbstractQmsMessage CreateMessageWithBody(long messageNbr, + // ContentHeaderBody contentHeader, + // IList bodies); + + private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory)); + + protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader); + + protected AbstractQmsMessage CreateMessageWithBody(long messageNbr, + ContentHeaderBody contentHeader, + IList bodies) + { + ByteBuffer data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")"); + data = HeapByteBuffer.wrap(((ContentBody)bodies[0]).Payload); + } + else + { + _logger.Debug("Fragmented message body (" + bodies.Count + " frames, bodySize=" + contentHeader.BodySize + ")"); + data = ByteBuffer.Allocate((int)contentHeader.BodySize); // XXX: Is cast a problem? + foreach (ContentBody body in bodies) { + data.Put(body.Payload); + //body.Payload.Release(); + } + + data.Flip(); + } + _logger.Debug("Creating message from buffer with position=" + data.Position + " and remaining=" + data.Remaining); + + return CreateMessage(messageNbr, data, contentHeader); + } + + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) { AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); msg.Redelivered = redelivered; return msg; } - - public abstract AbstractQmsMessage CreateMessage(); - - /// - /// - /// - /// - /// - /// - /// - /// - protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, - ContentHeaderBody contentHeader, - IList bodies); } } diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs index c84e9de1b9..4c4adb8063 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -24,67 +24,71 @@ using System.Text; using log4net; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Buffer; namespace Qpid.Client.Message { - public class SendOnlyDestination : AMQDestination + public abstract class AbstractQmsMessage : AMQMessage, IMessage { - private static readonly ILog _log = LogManager.GetLogger(typeof(string)); + private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage)); - public SendOnlyDestination(string exchangeName, string routingKey) - : base(exchangeName, null, null, false, false, routingKey) - { - _log.Debug( - string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}", - exchangeName, routingKey)); - } +// protected long _messageNbr; - public override string EncodedName - { - get { return ExchangeName + ":" + QueueName; } - } + protected bool _redelivered; - public override string RoutingKey - { - get { return QueueName; } - } + protected ByteBuffer _data; - public override bool IsNameRequired - { - get { throw new NotImplementedException(); } - } - } + //protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) + //{ + //} - public abstract class AbstractQmsMessage : AMQMessage, IMessage - { - private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + //protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader) + // : this(contentHeader) + //{ + // _messageNbr = messageNbr; + //} - protected ulong _messageNbr; + //protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader) + // : base(contentHeader) + //{ + //} - protected bool _redelivered; - protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) - { +#region new_java_ctrs + + protected AbstractQmsMessage(ByteBuffer data) + : base(new BasicContentHeaderProperties()) + { + _data = data; + if (_data != null) + { + _data.Acquire(); + } } - protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader) - : this(contentHeader) - { - _messageNbr = messageNbr; + protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + : this(contentHeader, deliveryTag) + { + _data = data; + if (_data != null) + { + _data.Acquire(); + } } - protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader) - : base(contentHeader) - { + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) + { } +#endregion + public string MessageId { get { if (ContentHeaderProperties.MessageId == null) { - ContentHeaderProperties.MessageId = "ID:" + _messageNbr; + ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; } return ContentHeaderProperties.MessageId; } @@ -92,6 +96,8 @@ namespace Qpid.Client.Message { ContentHeaderProperties.MessageId = value; } + + } public long Timestamp @@ -321,8 +327,11 @@ namespace Qpid.Client.Message // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_channel != null) { - _channel.SendAcknowledgement(_messageNbr); + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _channel.AcknowledgeMessage((ulong)DeliveryTag, true); } + } public IHeaders Headers @@ -344,10 +353,23 @@ namespace Qpid.Client.Message /// the message. /// /// a byte array of message data - public abstract byte[] Data + public ByteBuffer Data { - get; - set; + get + { + // make sure we rewind the data just in case any method has moved the + // position beyond the start + if (_data != null) + { + _data.Rewind(); + } + return _data; + } + + set + { + _data = value; + } } public abstract string MimeType @@ -367,7 +389,7 @@ namespace Qpid.Client.Message buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); - buf.Append("\nAMQ message number: ").Append(_messageNbr); + buf.Append("\nAMQ message number: ").Append(DeliveryTag); buf.Append("\nProperties:"); if (ContentHeaderProperties.Headers == null) { @@ -430,17 +452,17 @@ namespace Qpid.Client.Message /// Get the AMQ message number assigned to this message /// /// the message number - public ulong MessageNbr - { - get - { - return _messageNbr; - } - set - { - _messageNbr = value; - } - } + //public ulong MessageNbr + //{ + // get + // { + // return _messageNbr; + // } + // set + // { + // _messageNbr = value; + // } + //} public BasicContentHeaderProperties ContentHeaderProperties { diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs index 2e71bfc948..4a109b128e 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs @@ -34,7 +34,7 @@ namespace Qpid.Client.Message /// /// /// if the message cannot be created - AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered, ContentHeaderBody contentHeader, IList bodies); diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs index 3965d531bb..95257cef8a 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs @@ -58,7 +58,7 @@ namespace Qpid.Client.Message /// the message. /// /// - public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) { diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs index b7911b44b9..9ff3d543d8 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -23,6 +23,7 @@ using System.IO; using System.Text; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Buffer; namespace Qpid.Client.Message { @@ -50,7 +51,7 @@ namespace Qpid.Client.Message /// /// if data is not null, the message is immediately in read only mode. if data is null, it is in /// write-only mode - QpidBytesMessage(byte[] data) : base() + QpidBytesMessage(ByteBuffer data) : base(data) { // superclass constructor has instantiated a content header at this point ContentHeaderProperties.ContentType = MIME_TYPE; @@ -61,22 +62,23 @@ namespace Qpid.Client.Message } else { - _dataStream = new MemoryStream(data); - _bodyLength = data.Length; + _dataStream = new MemoryStream(data.ToByteArray()); + _bodyLength = data.ToByteArray().Length; _reader = new BinaryReader(_dataStream); } } - public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader) + internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties) - { + : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data) + { ContentHeaderProperties.ContentType = MIME_TYPE; - _dataStream = new MemoryStream(data); - _bodyLength = data.Length; + _dataStream = new MemoryStream(data.ToByteArray()); + _bodyLength = data.ToByteArray().Length; _reader = new BinaryReader(_dataStream); } + public override void ClearBody() { if (_reader != null) @@ -119,27 +121,27 @@ namespace Qpid.Client.Message } } - public override byte[] Data - { - get - { - if (_dataStream == null) - { - return null; - } - else - { - byte[] data = new byte[_dataStream.Length]; - _dataStream.Position = 0; - _dataStream.Read(data, 0, (int) _dataStream.Length); - return data; - } - } - set - { - throw new NotSupportedException("Cannot set data payload except during construction"); - } - } + //public override byte[] Data + //{ + // get + // { + // if (_dataStream == null) + // { + // return null; + // } + // else + // { + // byte[] data = new byte[_dataStream.Length]; + // _dataStream.Position = 0; + // _dataStream.Read(data, 0, (int) _dataStream.Length); + // return data; + // } + // } + // set + // { + // throw new NotSupportedException("Cannot set data payload except during construction"); + // } + //} public override string MimeType { diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs index 3f2a6c531f..de4c6675c7 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs @@ -21,40 +21,52 @@ using System; using System.Collections; using Qpid.Framing; +using Qpid.Buffer; namespace Qpid.Client.Message { public class QpidBytesMessageFactory : AbstractQmsMessageFactory { - protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, - ContentHeaderBody contentHeader, - IList bodies) + //protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, + // ContentHeaderBody contentHeader, + // IList bodies) + //{ + // byte[] data; + + // // we optimise the non-fragmented case to avoid copying + // if (bodies != null && bodies.Count == 1) + // { + // data = ((ContentBody)bodies[0]).Payload; + // } + // else + // { + // data = new byte[(long)contentHeader.BodySize]; + // int currentPosition = 0; + // foreach (ContentBody cb in bodies) + // { + // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + // currentPosition += cb.Payload.Length; + // } + // } + + // return new QpidBytesMessage(messageNbr, data, contentHeader); + //} + + //public override AbstractQmsMessage CreateMessage() + //{ + // return new QpidBytesMessage(); + //} + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) { - byte[] data; - - // we optimise the non-fragmented case to avoid copying - if (bodies != null && bodies.Count == 1) - { - data = ((ContentBody)bodies[0]).Payload; - } - else - { - data = new byte[(long)contentHeader.BodySize]; - int currentPosition = 0; - foreach (ContentBody cb in bodies) - { - Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); - currentPosition += cb.Payload.Length; - } - } - - return new QpidBytesMessage(messageNbr, data, contentHeader); + return new QpidBytesMessage(deliveryTag, contentHeader, data); } public override AbstractQmsMessage CreateMessage() { return new QpidBytesMessage(); } + } } diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs index 4c16038d4b..ae5e2b7e66 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -22,6 +22,7 @@ using System; using System.Text; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Buffer; namespace Qpid.Client.Message { @@ -29,34 +30,58 @@ namespace Qpid.Client.Message { private const string MIME_TYPE = "text/plain"; - private byte[] _data; - private string _decodedValue; - public QpidTextMessage() : this(null, null) - { + //public QpidTextMessage() : this(null, null) + //{ + //} + + //public QpidTextMessage(byte[] data, String encoding) : base() + //{ + // // the superclass has instantied a content header at this point + // ContentHeaderProperties.ContentType= MIME_TYPE; + // _data = data; + // ContentHeaderProperties.Encoding = encoding; + //} + + //public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader) + // : base(messageNbr, contentHeader) + //{ + // contentHeader.ContentType = MIME_TYPE; + // _data = data; + //} + + //public QpidTextMessage(byte[] data) : this(data, null) + //{ + //} + + //public QpidTextMessage(string text) + //{ + // Text = text; + //} + + internal QpidTextMessage() : this(null, null) + { } - public QpidTextMessage(byte[] data, String encoding) : base() + QpidTextMessage(ByteBuffer data, String encoding) : base(data) { - // the superclass has instantied a content header at this point - ContentHeaderProperties.ContentType= MIME_TYPE; - _data = data; + ContentHeaderProperties.ContentType = MIME_TYPE; ContentHeaderProperties.Encoding = encoding; } - public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader) - : base(messageNbr, contentHeader) - { + internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + :base(deliveryTag, contentHeader, data) + { contentHeader.ContentType = MIME_TYPE; _data = data; } - public QpidTextMessage(byte[] data) : this(data, null) - { + QpidTextMessage(ByteBuffer data) : this(data, null) + { } - public QpidTextMessage(string text) + QpidTextMessage(String text) : base((ByteBuffer)null) { Text = text; } @@ -72,18 +97,6 @@ namespace Qpid.Client.Message return Text; } - public override byte[] Data - { - get - { - return _data; - } - set - { - _data = value; - } - } - public override string MimeType { get @@ -109,27 +122,29 @@ namespace Qpid.Client.Message if (ContentHeaderProperties.Encoding != null) { // throw ArgumentException if the encoding is not supported - _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data); + _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data.ToByteArray()); } else { - _decodedValue = Encoding.Default.GetString(_data); + _decodedValue = Encoding.Default.GetString(_data.ToByteArray()); } return _decodedValue; } } set - { + { + byte[] bytes; if (ContentHeaderProperties.Encoding == null) { - _data = Encoding.Default.GetBytes(value); + bytes = Encoding.Default.GetBytes(value); } else { // throw ArgumentException if the encoding is not supported - _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); + bytes = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); } + _data = HeapByteBuffer.wrap(bytes, bytes.Length); _decodedValue = value; } } diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs index 5457b2301e..54ce8d023c 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -21,40 +21,55 @@ using System; using System.Collections; using Qpid.Framing; +using Qpid.Buffer; namespace Qpid.Client.Message { public class QpidTextMessageFactory : AbstractQmsMessageFactory { - protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader, - IList bodies) - { - byte[] data; - - // we optimise the non-fragmented case to avoid copying - if (bodies != null && bodies.Count == 1) - { - data = ((ContentBody)bodies[0]).Payload; - } - else - { - data = new byte[(int)contentHeader.BodySize]; - int currentPosition = 0; - foreach (ContentBody cb in bodies) - { - Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); - currentPosition += cb.Payload.Length; - } - } - - return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties); - } + + // protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, + // IList bodies) + // { + // byte[] data; + + // // we optimise the non-fragmented case to avoid copying + // if (bodies != null && bodies.Count == 1) + // { + // data = ((ContentBody)bodies[0]).Payload; + // } + // else + // { + // data = new byte[(int)contentHeader.BodySize]; + // int currentPosition = 0; + // foreach (ContentBody cb in bodies) + // { + // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + // currentPosition += cb.Payload.Length; + // } + // } + + // return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties); + // } + // public override AbstractQmsMessage CreateMessage() + // { + // return new QpidTextMessage(); + // } + + + public override AbstractQmsMessage CreateMessage() { return new QpidTextMessage(); - } + } + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) + { + return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data); + } + } } -- cgit v1.2.1