summaryrefslogtreecommitdiff
path: root/qpid/dotnet
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
commit5924742a77953f3de8776d9b45e2396c605f1aa2 (patch)
tree5a06c3c010957b260291346fb954be72e77a5a55 /qpid/dotnet
parent260e96b8c02b021d237bec653bdc753488d96308 (diff)
downloadqpid-python-5924742a77953f3de8776d9b45e2396c605f1aa2.tar.gz
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet')
-rw-r--r--qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs5
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs65
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AMQConnection.cs4
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs26
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs170
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs3
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs73
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs128
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs2
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs2
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs60
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs56
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs77
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs63
14 files changed, 479 insertions, 255 deletions
diff --git a/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
index 03dbd58bff..c54272b33f 100644
--- a/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
+++ b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
@@ -385,6 +385,11 @@ namespace Qpid.Buffer
{
return new HeapByteBuffer(bytes, length);
}
+
+ public static HeapByteBuffer wrap(byte[] bytes)
+ {
+ return new HeapByteBuffer(bytes, bytes.Length);
+ }
}
}
diff --git a/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
index eb08ca9446..79a04e79eb 100644
--- a/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
@@ -35,12 +35,12 @@ namespace Qpid.Client.Tests.failover
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
- const int NUM_ROLLEDBACK_MESSAGES = 5;
+ const int NUM_ROLLEDBACK_MESSAGES = 3;
const int SLEEP_MILLIS = 500;
AMQConnection _connection;
- public void onMessage(IMessage message)
+ public void OnMessage(IMessage message)
{
try
{
@@ -48,7 +48,40 @@ namespace Qpid.Client.Tests.failover
}
catch (QpidException e)
{
- error(e);
+ Error(e);
+ }
+ }
+
+ class NoWaitConsumer
+ {
+ FailoverTxTest _failoverTxTest;
+ IMessageConsumer _consumer;
+
+ internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel)
+ {
+ _failoverTxTest = failoverTxTest;
+ _consumer = channel;
+ }
+
+ internal void Run()
+ {
+ int messages = 0;
+ while (messages < NUM_COMMITED_MESSAGES)
+ {
+ IMessage msg = _consumer.ReceiveNoWait();
+ if (msg != null)
+ {
+ _log.Info("NoWait received message");
+ ++messages;
+ _failoverTxTest.OnMessage(msg);
+ }
+ else
+ {
+ Thread.Sleep(1);
+ }
+
+ }
+
}
}
@@ -60,7 +93,7 @@ namespace Qpid.Client.Tests.failover
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.asUrl = " + _connection.toURL());
- IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
+ IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
string queueName = receivingChannel.GenerateUniqueName();
@@ -70,11 +103,23 @@ namespace Qpid.Client.Tests.failover
// No need to call Queue.Bind as automatically bound to default direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
- receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
+
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create();
+ bool useThread = true;
+ if (useThread)
+ {
+ NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer);
+ new Thread(noWaitConsumer.Run).Start();
+ }
+ else
+ {
+ //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
_connection.Start();
- publishInTx(queueName);
+ PublishInTx(queueName);
Thread.Sleep(2000); // Wait a while for last messages.
@@ -82,7 +127,7 @@ namespace Qpid.Client.Tests.failover
_log.Info("FailoverTxText complete");
}
- private void publishInTx(string routingKey)
+ private void PublishInTx(string routingKey)
{
_log.Info("sendInTx");
bool transacted = true;
@@ -113,13 +158,13 @@ namespace Qpid.Client.Tests.failover
}
}
- private void error(Exception e)
+ private void Error(Exception e)
{
_log.Fatal("Exception received. About to stop.", e);
- stop();
+ Stop();
}
- private void stop()
+ private void Stop()
{
_log.Info("Stopping...");
try
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
index a19b46c40a..5c0537429e 100644
--- a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -265,11 +265,11 @@ namespace Qpid.Client
int _prefetch;
AMQConnection _connection;
- public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgementMode, int prefetch)
+ public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
{
_connection = connection;
_transacted = transacted;
- _acknowledgeMode = acknowledgementMode;
+ _acknowledgeMode = acknowledgeMode;
_prefetch = prefetch;
}
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 48d87d8f90..0ab3fd3411 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -124,7 +124,7 @@ namespace Qpid.Client
}
else
{
- consumer.NotifyMessage(message, _containingChannel.AcknowledgeMode, _containingChannel.ChannelId);
+ consumer.NotifyMessage(message, _containingChannel.ChannelId);
}
}
else
@@ -595,22 +595,6 @@ namespace Qpid.Client
}
}
- /// <summary>
- /// Send an acknowledgement for all messages up to a specified number on this session.
- /// <param name="messageNbr">the message number up to an including which all messages will be acknowledged.</param>
- /// </summary>
- public void SendAcknowledgement(ulong messageNbr)
- {
- /*if (_logger.IsDebugEnabled)
- {
- _logger.Debug("Channel Ack being sent for channel id " + _channelId + " and message number " + messageNbr);
- }*/
- /*Channel.Ack frame = new Channel.Ack();
- frame.channelId = _channelId;
- frame.messageNbr = messageNbr;
- _connection.getProtocolHandler().writeFrame(frame);*/
- }
-
internal void Start()
{
_dispatcher = new Dispatcher(this);
@@ -815,7 +799,7 @@ namespace Qpid.Client
currentTime = DateTime.UtcNow.Ticks;
message.Timestamp = currentTime;
}
- byte[] payload = message.Data;
+ byte[] payload = message.Data.ToByteArray();
BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
if (timeToLive > 0)
@@ -986,10 +970,10 @@ namespace Qpid.Client
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
*/
- public void AcknowledgeMessage(long deliveryTag, bool multiple)
+ public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
- // XXX: cast to ulong evil?
- AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple);
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
+ _logger.Info("XXX sending ack: " + ackFrame);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 1c9a009174..ffd19e9500 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -50,15 +50,18 @@ namespace Qpid.Client
set { _noLocal = value; }
}
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge;
-
public AcknowledgeMode AcknowledgeMode
{
- get { return _acknowledgeMode; }
+ get { return _channel.AcknowledgeMode; }
}
private MessageReceivedDelegate _messageListener;
+ private bool IsMessageListenerSet
+ {
+ get { return _messageListener != null; }
+ }
+
/// <summary>
/// The consumer tag allows us to close the consumer by sending a jmsCancel method to the
/// broker
@@ -173,12 +176,7 @@ namespace Qpid.Client
o = _synchronousQueue.DequeueBlocking();
}
- IMessage m = ReturnMessageOrThrow(o);
- if (m != null)
- {
- PostDeliver(m);
- }
- return m;
+ return ReturnMessageOrThrowAndPostDeliver(o);
}
finally
{
@@ -189,6 +187,16 @@ namespace Qpid.Client
}
}
+ private IMessage ReturnMessageOrThrowAndPostDeliver(object o)
+ {
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
+ }
+
public IMessage Receive()
{
return Receive(0);
@@ -211,8 +219,14 @@ namespace Qpid.Client
try
{
- object o = _synchronousQueue.Dequeue();
- return ReturnMessageOrThrow(o);
+ if (_synchronousQueue.Count > 0)
+ {
+ return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
+ }
+ else
+ {
+ return null;
+ }
}
finally
{
@@ -285,14 +299,73 @@ namespace Qpid.Client
}
}
- /// <summary>
- /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
- /// of a message listener or a synchronous receive() caller.
- /// </summary>
- /// <param name="messageFrame">the raw unprocessed mesage</param>
- /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
- /// <param name="channelId">channel on which this message was sent</param>
- internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
+// /// <summary>
+// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
+// /// of a message listener or a synchronous receive() caller.
+// /// </summary>
+// /// <param name="messageFrame">the raw unprocessed mesage</param>
+// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
+// /// <param name="channelId">channel on which this message was sent</param>
+// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
+// {
+// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+// if (_logger.IsDebugEnabled)
+// {
+// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+// }
+// try
+// {
+// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
+// messageFrame.DeliverBody.Redelivered,
+// messageFrame.ContentHeader,
+// messageFrame.Bodies);
+
+// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
+// {
+// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
+// }*/
+// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+// {
+// // we set the session so that when the user calls acknowledge() it can call the method on session
+// // to send out the appropriate frame
+// jmsMessage.Channel = _channel;
+// }
+
+// lock (_syncLock)
+// {
+// if (_messageListener != null)
+// {
+//#if __MonoCS__
+// _messageListener(jmsMessage);
+//#else
+// _messageListener.Invoke(jmsMessage);
+//#endif
+// }
+// else
+// {
+// _synchronousQueue.Enqueue(jmsMessage);
+// }
+// }
+// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+// {
+// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+// }
+// }
+// catch (Exception e)
+// {
+// _logger.Error("Caught exception (dump follows) - ignoring...", e);
+// }
+// }
+
+
+ /**
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
+ * of a message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ * @param channelId channel on which this message was sent
+ */
+ internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId)
{
if (_logger.IsDebugEnabled)
{
@@ -300,48 +373,38 @@ namespace Qpid.Client
}
try
{
- AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag,
+ AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
messageFrame.DeliverBody.Redelivered,
messageFrame.ContentHeader,
messageFrame.Bodies);
- /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
- {
- _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
- }*/
- if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- jmsMessage.Channel = _channel;
- }
+ _logger.Debug("Message is of type: " + jmsMessage.GetType().Name);
- lock (_syncLock)
+ PreDeliver(jmsMessage);
+
+ if (IsMessageListenerSet)
{
- if (_messageListener != null)
- {
+ // We do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started.
#if __MonoCS__
_messageListener(jmsMessage);
#else
- _messageListener.Invoke(jmsMessage);
+ _messageListener.Invoke(jmsMessage);
#endif
- }
- else
- {
- _synchronousQueue.Enqueue(jmsMessage);
- }
+ PostDeliver(jmsMessage);
}
- if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+ else
{
- _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+ _synchronousQueue.Enqueue(jmsMessage);
}
}
catch (Exception e)
{
- _logger.Error("Caught exception (dump follows) - ignoring...", e);
+ _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME
}
}
+
internal void NotifyError(Exception cause)
{
lock (_syncLock)
@@ -416,15 +479,32 @@ namespace Qpid.Client
{
if (_lastDeliveryTag > 0)
{
- _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
_lastDeliveryTag = -1;
}
}
+ private void PreDeliver(AbstractQmsMessage msg)
+ {
+ switch (AcknowledgeMode)
+ {
+ case AcknowledgeMode.PreAcknowledge:
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false);
+ break;
+
+ case AcknowledgeMode.ClientAcknowledge:
+ // We set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame.
+ //msg.setAMQSession(_session);
+ msg.Channel = _channel;
+ break;
+ }
+ }
+
private void PostDeliver(IMessage m)
{
AbstractQmsMessage msg = (AbstractQmsMessage) m;
- switch (_acknowledgeMode)
+ switch (AcknowledgeMode)
{
/* TODO
case AcknowledgeMode.DupsOkAcknowledge:
@@ -444,7 +524,7 @@ namespace Qpid.Client
break;
*/
case AcknowledgeMode.AutoAcknowledge:
- _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
break;
case AcknowledgeMode.SessionTransacted:
_lastDeliveryTag = msg.DeliveryTag;
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
index 81499e31fe..a43eb028df 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
@@ -39,7 +39,8 @@ namespace Qpid.Client.Message
_deliveryTag = deliveryTag;
}
- public AMQMessage(IContentHeaderProperties properties) : this(properties, -1)
+ public AMQMessage(IContentHeaderProperties properties)
+ : this(properties, -1)
{
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
index e485bb6d34..dd9855d675 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -20,30 +20,73 @@
*/
using System.Collections;
using Qpid.Framing;
+using log4net;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public abstract class AbstractQmsMessageFactory : IMessageFactory
{
- public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies)
+ //public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies)
+ //{
+ // AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
+ // msg.Redelivered = redelivered;
+ // return msg;
+ //}
+
+ public abstract AbstractQmsMessage CreateMessage();
+
+ ///// <summary>
+ /////
+ ///// </summary>
+ ///// <param name="messageNbr"></param>
+ ///// <param name="contentHeader"></param>
+ ///// <param name="bodies"></param>
+ ///// <returns></returns>
+ ///// <exception cref="AMQException"></exception>
+ //protected abstract AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ // ContentHeaderBody contentHeader,
+ // IList bodies);
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory));
+
+ protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader);
+
+ protected AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ ByteBuffer data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")");
+ data = HeapByteBuffer.wrap(((ContentBody)bodies[0]).Payload);
+ }
+ else
+ {
+ _logger.Debug("Fragmented message body (" + bodies.Count + " frames, bodySize=" + contentHeader.BodySize + ")");
+ data = ByteBuffer.Allocate((int)contentHeader.BodySize); // XXX: Is cast a problem?
+ foreach (ContentBody body in bodies) {
+ data.Put(body.Payload);
+ //body.Payload.Release();
+ }
+
+ data.Flip();
+ }
+ _logger.Debug("Creating message from buffer with position=" + data.Position + " and remaining=" + data.Remaining);
+
+ return CreateMessage(messageNbr, data, contentHeader);
+ }
+
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
{
AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
msg.Redelivered = redelivered;
return msg;
}
-
- public abstract AbstractQmsMessage CreateMessage();
-
- /// <summary>
- ///
- /// </summary>
- /// <param name="messageNbr"></param>
- /// <param name="contentHeader"></param>
- /// <param name="bodies"></param>
- /// <returns></returns>
- /// <exception cref="AMQException"></exception>
- protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
- ContentHeaderBody contentHeader,
- IList bodies);
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index c84e9de1b9..4c4adb8063 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -24,67 +24,71 @@ using System.Text;
using log4net;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
- public class SendOnlyDestination : AMQDestination
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
{
- private static readonly ILog _log = LogManager.GetLogger(typeof(string));
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
- public SendOnlyDestination(string exchangeName, string routingKey)
- : base(exchangeName, null, null, false, false, routingKey)
- {
- _log.Debug(
- string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}",
- exchangeName, routingKey));
- }
+// protected long _messageNbr;
- public override string EncodedName
- {
- get { return ExchangeName + ":" + QueueName; }
- }
+ protected bool _redelivered;
- public override string RoutingKey
- {
- get { return QueueName; }
- }
+ protected ByteBuffer _data;
- public override bool IsNameRequired
- {
- get { throw new NotImplementedException(); }
- }
- }
+ //protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
+ //{
+ //}
- public abstract class AbstractQmsMessage : AMQMessage, IMessage
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+ //protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
+ // : this(contentHeader)
+ //{
+ // _messageNbr = messageNbr;
+ //}
- protected ulong _messageNbr;
+ //protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
+ // : base(contentHeader)
+ //{
+ //}
- protected bool _redelivered;
- protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
- {
+#region new_java_ctrs
+
+ protected AbstractQmsMessage(ByteBuffer data)
+ : base(new BasicContentHeaderProperties())
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
- : this(contentHeader)
- {
- _messageNbr = messageNbr;
+ protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ : this(contentHeader, deliveryTag)
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
- : base(contentHeader)
- {
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+ {
}
+#endregion
+
public string MessageId
{
get
{
if (ContentHeaderProperties.MessageId == null)
{
- ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+ ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
@@ -92,6 +96,8 @@ namespace Qpid.Client.Message
{
ContentHeaderProperties.MessageId = value;
}
+
+
}
public long Timestamp
@@ -321,8 +327,11 @@ namespace Qpid.Client.Message
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
{
- _channel.SendAcknowledgement(_messageNbr);
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
+
}
public IHeaders Headers
@@ -344,10 +353,23 @@ namespace Qpid.Client.Message
/// the message.
/// </summary>
/// <value>a byte array of message data</value>
- public abstract byte[] Data
+ public ByteBuffer Data
{
- get;
- set;
+ get
+ {
+ // make sure we rewind the data just in case any method has moved the
+ // position beyond the start
+ if (_data != null)
+ {
+ _data.Rewind();
+ }
+ return _data;
+ }
+
+ set
+ {
+ _data = value;
+ }
}
public abstract string MimeType
@@ -367,7 +389,7 @@ namespace Qpid.Client.Message
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
- buf.Append("\nAMQ message number: ").Append(_messageNbr);
+ buf.Append("\nAMQ message number: ").Append(DeliveryTag);
buf.Append("\nProperties:");
if (ContentHeaderProperties.Headers == null)
{
@@ -430,17 +452,17 @@ namespace Qpid.Client.Message
/// Get the AMQ message number assigned to this message
/// </summary>
/// <returns>the message number</returns>
- public ulong MessageNbr
- {
- get
- {
- return _messageNbr;
- }
- set
- {
- _messageNbr = value;
- }
- }
+ //public ulong MessageNbr
+ //{
+ // get
+ // {
+ // return _messageNbr;
+ // }
+ // set
+ // {
+ // _messageNbr = value;
+ // }
+ //}
public BasicContentHeaderProperties ContentHeaderProperties
{
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
index 2e71bfc948..4a109b128e 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
@@ -34,7 +34,7 @@ namespace Qpid.Client.Message
/// <param name="bodies"></param>
/// <returns></returns>
/// <exception cref="QpidMessagingException">if the message cannot be created</exception>
- AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
ContentHeaderBody contentHeader,
IList bodies);
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
index 3965d531bb..95257cef8a 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
@@ -58,7 +58,7 @@ namespace Qpid.Client.Message
/// <returns>the message.</returns>
/// <exception cref="AMQException"/>
/// <exception cref="QpidException"/>
- public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
ContentHeaderBody contentHeader,
IList bodies)
{
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
index b7911b44b9..9ff3d543d8 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -23,6 +23,7 @@ using System.IO;
using System.Text;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
@@ -50,7 +51,7 @@ namespace Qpid.Client.Message
/// </summary>
/// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in
/// write-only mode</param>
- QpidBytesMessage(byte[] data) : base()
+ QpidBytesMessage(ByteBuffer data) : base(data)
{
// superclass constructor has instantiated a content header at this point
ContentHeaderProperties.ContentType = MIME_TYPE;
@@ -61,22 +62,23 @@ namespace Qpid.Client.Message
}
else
{
- _dataStream = new MemoryStream(data);
- _bodyLength = data.Length;
+ _dataStream = new MemoryStream(data.ToByteArray());
+ _bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
}
- public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader)
+ internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties)
- {
+ : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
+ {
ContentHeaderProperties.ContentType = MIME_TYPE;
- _dataStream = new MemoryStream(data);
- _bodyLength = data.Length;
+ _dataStream = new MemoryStream(data.ToByteArray());
+ _bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
+
public override void ClearBody()
{
if (_reader != null)
@@ -119,27 +121,27 @@ namespace Qpid.Client.Message
}
}
- public override byte[] Data
- {
- get
- {
- if (_dataStream == null)
- {
- return null;
- }
- else
- {
- byte[] data = new byte[_dataStream.Length];
- _dataStream.Position = 0;
- _dataStream.Read(data, 0, (int) _dataStream.Length);
- return data;
- }
- }
- set
- {
- throw new NotSupportedException("Cannot set data payload except during construction");
- }
- }
+ //public override byte[] Data
+ //{
+ // get
+ // {
+ // if (_dataStream == null)
+ // {
+ // return null;
+ // }
+ // else
+ // {
+ // byte[] data = new byte[_dataStream.Length];
+ // _dataStream.Position = 0;
+ // _dataStream.Read(data, 0, (int) _dataStream.Length);
+ // return data;
+ // }
+ // }
+ // set
+ // {
+ // throw new NotSupportedException("Cannot set data payload except during construction");
+ // }
+ //}
public override string MimeType
{
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
index 3f2a6c531f..de4c6675c7 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
@@ -21,40 +21,52 @@
using System;
using System.Collections;
using Qpid.Framing;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public class QpidBytesMessageFactory : AbstractQmsMessageFactory
{
- protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
- ContentHeaderBody contentHeader,
- IList bodies)
+ //protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ // ContentHeaderBody contentHeader,
+ // IList bodies)
+ //{
+ // byte[] data;
+
+ // // we optimise the non-fragmented case to avoid copying
+ // if (bodies != null && bodies.Count == 1)
+ // {
+ // data = ((ContentBody)bodies[0]).Payload;
+ // }
+ // else
+ // {
+ // data = new byte[(long)contentHeader.BodySize];
+ // int currentPosition = 0;
+ // foreach (ContentBody cb in bodies)
+ // {
+ // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ // currentPosition += cb.Payload.Length;
+ // }
+ // }
+
+ // return new QpidBytesMessage(messageNbr, data, contentHeader);
+ //}
+
+ //public override AbstractQmsMessage CreateMessage()
+ //{
+ // return new QpidBytesMessage();
+ //}
+
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
{
- byte[] data;
-
- // we optimise the non-fragmented case to avoid copying
- if (bodies != null && bodies.Count == 1)
- {
- data = ((ContentBody)bodies[0]).Payload;
- }
- else
- {
- data = new byte[(long)contentHeader.BodySize];
- int currentPosition = 0;
- foreach (ContentBody cb in bodies)
- {
- Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
- currentPosition += cb.Payload.Length;
- }
- }
-
- return new QpidBytesMessage(messageNbr, data, contentHeader);
+ return new QpidBytesMessage(deliveryTag, contentHeader, data);
}
public override AbstractQmsMessage CreateMessage()
{
return new QpidBytesMessage();
}
+
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
index 4c16038d4b..ae5e2b7e66 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -22,6 +22,7 @@ using System;
using System.Text;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
@@ -29,34 +30,58 @@ namespace Qpid.Client.Message
{
private const string MIME_TYPE = "text/plain";
- private byte[] _data;
-
private string _decodedValue;
- public QpidTextMessage() : this(null, null)
- {
+ //public QpidTextMessage() : this(null, null)
+ //{
+ //}
+
+ //public QpidTextMessage(byte[] data, String encoding) : base()
+ //{
+ // // the superclass has instantied a content header at this point
+ // ContentHeaderProperties.ContentType= MIME_TYPE;
+ // _data = data;
+ // ContentHeaderProperties.Encoding = encoding;
+ //}
+
+ //public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader)
+ // : base(messageNbr, contentHeader)
+ //{
+ // contentHeader.ContentType = MIME_TYPE;
+ // _data = data;
+ //}
+
+ //public QpidTextMessage(byte[] data) : this(data, null)
+ //{
+ //}
+
+ //public QpidTextMessage(string text)
+ //{
+ // Text = text;
+ //}
+
+ internal QpidTextMessage() : this(null, null)
+ {
}
- public QpidTextMessage(byte[] data, String encoding) : base()
+ QpidTextMessage(ByteBuffer data, String encoding) : base(data)
{
- // the superclass has instantied a content header at this point
- ContentHeaderProperties.ContentType= MIME_TYPE;
- _data = data;
+ ContentHeaderProperties.ContentType = MIME_TYPE;
ContentHeaderProperties.Encoding = encoding;
}
- public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader)
- : base(messageNbr, contentHeader)
- {
+ internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ :base(deliveryTag, contentHeader, data)
+ {
contentHeader.ContentType = MIME_TYPE;
_data = data;
}
- public QpidTextMessage(byte[] data) : this(data, null)
- {
+ QpidTextMessage(ByteBuffer data) : this(data, null)
+ {
}
- public QpidTextMessage(string text)
+ QpidTextMessage(String text) : base((ByteBuffer)null)
{
Text = text;
}
@@ -72,18 +97,6 @@ namespace Qpid.Client.Message
return Text;
}
- public override byte[] Data
- {
- get
- {
- return _data;
- }
- set
- {
- _data = value;
- }
- }
-
public override string MimeType
{
get
@@ -109,27 +122,29 @@ namespace Qpid.Client.Message
if (ContentHeaderProperties.Encoding != null)
{
// throw ArgumentException if the encoding is not supported
- _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data);
+ _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data.ToByteArray());
}
else
{
- _decodedValue = Encoding.Default.GetString(_data);
+ _decodedValue = Encoding.Default.GetString(_data.ToByteArray());
}
return _decodedValue;
}
}
set
- {
+ {
+ byte[] bytes;
if (ContentHeaderProperties.Encoding == null)
{
- _data = Encoding.Default.GetBytes(value);
+ bytes = Encoding.Default.GetBytes(value);
}
else
{
// throw ArgumentException if the encoding is not supported
- _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+ bytes = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
}
+ _data = HeapByteBuffer.wrap(bytes, bytes.Length);
_decodedValue = value;
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
index 5457b2301e..54ce8d023c 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -21,40 +21,55 @@
using System;
using System.Collections;
using Qpid.Framing;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public class QpidTextMessageFactory : AbstractQmsMessageFactory
{
- protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader,
- IList bodies)
- {
- byte[] data;
-
- // we optimise the non-fragmented case to avoid copying
- if (bodies != null && bodies.Count == 1)
- {
- data = ((ContentBody)bodies[0]).Payload;
- }
- else
- {
- data = new byte[(int)contentHeader.BodySize];
- int currentPosition = 0;
- foreach (ContentBody cb in bodies)
- {
- Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
- currentPosition += cb.Payload.Length;
- }
- }
-
- return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties);
- }
+
+ // protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+ // IList bodies)
+ // {
+ // byte[] data;
+
+ // // we optimise the non-fragmented case to avoid copying
+ // if (bodies != null && bodies.Count == 1)
+ // {
+ // data = ((ContentBody)bodies[0]).Payload;
+ // }
+ // else
+ // {
+ // data = new byte[(int)contentHeader.BodySize];
+ // int currentPosition = 0;
+ // foreach (ContentBody cb in bodies)
+ // {
+ // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ // currentPosition += cb.Payload.Length;
+ // }
+ // }
+
+ // return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties);
+ // }
+ // public override AbstractQmsMessage CreateMessage()
+ // {
+ // return new QpidTextMessage();
+ // }
+
+
+
public override AbstractQmsMessage CreateMessage()
{
return new QpidTextMessage();
- }
+ }
+
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
+ {
+ return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data);
+ }
+
}
}