summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs')
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs128
1 files changed, 75 insertions, 53 deletions
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index c84e9de1b9..4c4adb8063 100644
--- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -24,67 +24,71 @@ using System.Text;
using log4net;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
- public class SendOnlyDestination : AMQDestination
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
{
- private static readonly ILog _log = LogManager.GetLogger(typeof(string));
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
- public SendOnlyDestination(string exchangeName, string routingKey)
- : base(exchangeName, null, null, false, false, routingKey)
- {
- _log.Debug(
- string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}",
- exchangeName, routingKey));
- }
+// protected long _messageNbr;
- public override string EncodedName
- {
- get { return ExchangeName + ":" + QueueName; }
- }
+ protected bool _redelivered;
- public override string RoutingKey
- {
- get { return QueueName; }
- }
+ protected ByteBuffer _data;
- public override bool IsNameRequired
- {
- get { throw new NotImplementedException(); }
- }
- }
+ //protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
+ //{
+ //}
- public abstract class AbstractQmsMessage : AMQMessage, IMessage
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+ //protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
+ // : this(contentHeader)
+ //{
+ // _messageNbr = messageNbr;
+ //}
- protected ulong _messageNbr;
+ //protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
+ // : base(contentHeader)
+ //{
+ //}
- protected bool _redelivered;
- protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
- {
+#region new_java_ctrs
+
+ protected AbstractQmsMessage(ByteBuffer data)
+ : base(new BasicContentHeaderProperties())
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
- : this(contentHeader)
- {
- _messageNbr = messageNbr;
+ protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ : this(contentHeader, deliveryTag)
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
- : base(contentHeader)
- {
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+ {
}
+#endregion
+
public string MessageId
{
get
{
if (ContentHeaderProperties.MessageId == null)
{
- ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+ ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
@@ -92,6 +96,8 @@ namespace Qpid.Client.Message
{
ContentHeaderProperties.MessageId = value;
}
+
+
}
public long Timestamp
@@ -321,8 +327,11 @@ namespace Qpid.Client.Message
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
{
- _channel.SendAcknowledgement(_messageNbr);
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
+
}
public IHeaders Headers
@@ -344,10 +353,23 @@ namespace Qpid.Client.Message
/// the message.
/// </summary>
/// <value>a byte array of message data</value>
- public abstract byte[] Data
+ public ByteBuffer Data
{
- get;
- set;
+ get
+ {
+ // make sure we rewind the data just in case any method has moved the
+ // position beyond the start
+ if (_data != null)
+ {
+ _data.Rewind();
+ }
+ return _data;
+ }
+
+ set
+ {
+ _data = value;
+ }
}
public abstract string MimeType
@@ -367,7 +389,7 @@ namespace Qpid.Client.Message
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
- buf.Append("\nAMQ message number: ").Append(_messageNbr);
+ buf.Append("\nAMQ message number: ").Append(DeliveryTag);
buf.Append("\nProperties:");
if (ContentHeaderProperties.Headers == null)
{
@@ -430,17 +452,17 @@ namespace Qpid.Client.Message
/// Get the AMQ message number assigned to this message
/// </summary>
/// <returns>the message number</returns>
- public ulong MessageNbr
- {
- get
- {
- return _messageNbr;
- }
- set
- {
- _messageNbr = value;
- }
- }
+ //public ulong MessageNbr
+ //{
+ // get
+ // {
+ // return _messageNbr;
+ // }
+ // set
+ // {
+ // _messageNbr = value;
+ // }
+ //}
public BasicContentHeaderProperties ContentHeaderProperties
{