summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-06-16 14:02:16 +0000
committerAidan Skinner <aidan@apache.org>2008-06-16 14:02:16 +0000
commit202bfa576abd99ba11f0da97019a76de048479e9 (patch)
treebfcb4f16eb318ff64b9dc51da64d1c1515b93ab7
parent2c010d5a9fd4e327f6418e10d99a267e95f9d685 (diff)
downloadqpid-python-202bfa576abd99ba11f0da97019a76de048479e9.tar.gz
QPID-1104: Add an IMessage.Acknowledge(bool) so that only specific messages can be acknowledged, not all messages recieved on the Channel up to that point.
Qpid.Client/Client/Message/AbstractQmsMessage.cs, dotnet/Qpid.Messaging/IMessage.cs: add paramatarised ack so that only certain messages can be ack'd Qpid.Integration.Tests/Qpid.Integration.Tests.csproj: add new test case Qpid.Integration.Tests/testcases/ClientAckTests.cs: new tests Qpid.Client/Client/BasicMessageProducer.cs, Qpid.Messaging/IMessagePublisher.cs:Add Channel property that the producer belongs too Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs: add SendMessages method git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@668164 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageProducer.cs812
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs1393
-rw-r--r--dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj7
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs9
-rwxr-xr-xdotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs179
-rw-r--r--dotnet/Qpid.Messaging/IMessage.cs195
-rw-r--r--dotnet/Qpid.Messaging/IMessagePublisher.cs189
7 files changed, 1496 insertions, 1288 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index f33afc452e..84605571eb 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -1,405 +1,407 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client
-{
- public class BasicMessageProducer : Closeable, IMessagePublisher
- {
- protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
- /// <summary>
- /// If true, messages will not get a timestamp.
- /// </summary>
- private bool _disableTimestamps;
-
- /// <summary>
- /// Priority of messages created by this producer.
- /// </summary>
- private int _messagePriority;
-
- /// <summary>
- /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- /// </summary>
- private long _timeToLive;
-
- /// <summary>
- /// Delivery mode used for this producer.
- /// </summary>
- private DeliveryMode _deliveryMode;
-
- private bool _immediate;
- private bool _mandatory;
-
- string _exchangeName;
- string _routingKey;
-
- /// <summary>
- /// Default encoding used for messages produced by this producer.
- /// </summary>
- private string _encoding;
-
- /// <summary>
- /// Default encoding used for message produced by this producer.
- /// </summary>
- private string _mimeType;
-
- /// <summary>
- /// True if this producer was created from a transacted session
- /// </summary>
- private bool _transacted;
-
- private ushort _channelId;
-
- /// <summary>
- /// This is an id generated by the session and is used to tie individual producers to the session. This means we
- /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
- /// to the session so that when an error is propagated to the session it can close the producer (meaning that
- /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- /// </summary>
- private long _producerId;
-
- /// <summary>
- /// The session used to create this producer
- /// </summary>
- private AmqChannel _channel;
-
- public BasicMessageProducer(string exchangeName, string routingKey,
- bool transacted,
- ushort channelId,
- AmqChannel channel,
- long producerId,
- DeliveryMode deliveryMode,
- long timeToLive,
- bool immediate,
- bool mandatory,
- int priority)
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _transacted = transacted;
- _channelId = channelId;
- _channel = channel;
- _producerId = producerId;
- _deliveryMode = deliveryMode;
- _timeToLive = timeToLive;
- _immediate = immediate;
- _mandatory = mandatory;
- _messagePriority = priority;
-
- _channel.RegisterProducer(producerId, this);
- }
-
-
- #region IMessagePublisher Members
-
- public DeliveryMode DeliveryMode
- {
- get
- {
- CheckNotClosed();
- return _deliveryMode;
- }
- set
- {
- CheckNotClosed();
- _deliveryMode = value;
- }
- }
-
- public string ExchangeName
- {
- get { return _exchangeName; }
- }
-
- public string RoutingKey
- {
- get { return _routingKey; }
- }
-
- public bool DisableMessageID
- {
- get
- {
- throw new Exception("The method or operation is not implemented.");
- }
- set
- {
- throw new Exception("The method or operation is not implemented.");
- }
- }
-
- public bool DisableMessageTimestamp
- {
- get
- {
- CheckNotClosed();
- return _disableTimestamps;
- }
- set
- {
- CheckNotClosed();
- _disableTimestamps = value;
- }
- }
-
- public int Priority
- {
- get
- {
- CheckNotClosed();
- return _messagePriority;
- }
- set
- {
- CheckNotClosed();
- if ( value < 0 || value > 9 )
- {
- throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = value;
- }
- }
-
- public override void Close()
- {
- _logger.Debug("Closing producer " + this);
- Interlocked.Exchange(ref _closed, CLOSED);
- _channel.DeregisterProducer(_producerId);
- }
-
- public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- deliveryMode,
- priority,
- (uint)timeToLive,
- _mandatory,
- _immediate
- );
- }
-
- public void Send(IMessage msg)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- _deliveryMode,
- _messagePriority,
- (uint)_timeToLive,
- _mandatory,
- _immediate
- );
- }
-
- // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
- // to facilitate publishing messages to potentially non-existent recipients.
- public void Send(IMessage msg, bool mandatory)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- _deliveryMode,
- _messagePriority,
- (uint)_timeToLive,
- mandatory,
- _immediate
- );
- }
-
- public long TimeToLive
- {
- get
- {
- CheckNotClosed();
- return _timeToLive;
- }
- set
- {
- CheckNotClosed();
- if ( value < 0 )
- {
- throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
- }
- _timeToLive = value;
- }
- }
-
- #endregion
-
- public string MimeType
- {
- get
- {
- CheckNotClosed();
- return _mimeType;
- }
- set
- {
- CheckNotClosed();
- _mimeType = value;
- }
- }
-
- public string Encoding
- {
- get
- {
- CheckNotClosed();
- return _encoding;
- }
- set
- {
- CheckNotClosed();
- _encoding = value;
- }
- }
-
- public void Dispose()
- {
- Close();
- }
-
- #region Message Publishing
-
- private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
- {
- // todo: handle session access ticket
- AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
- _channel.ChannelId, 0, exchangeName,
- routingKey, mandatory, immediate
- );
-
- // fix message properties
- if ( !_disableTimestamps )
- {
- message.Timestamp = DateTime.UtcNow.Ticks;
- if (timeToLive != 0)
- {
- message.Expiration = message.Timestamp + timeToLive;
- }
- } else
- {
- message.Expiration = 0;
- }
- message.DeliveryMode = deliveryMode;
- message.Priority = (byte)priority;
-
- ByteBuffer payload = message.Data;
- int payloadLength = payload.Limit;
-
- ContentBody[] contentBodies = CreateContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
- for ( int i = 0; i < contentBodies.Length; i++ )
- {
- frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
- }
- if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
- {
- _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
- _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
- message.ContentHeaderProperties, (uint)payloadLength
- );
- if ( _logger.IsDebugEnabled )
- {
- _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- lock ( _channel.Connection.FailoverMutex )
- {
- _channel.Connection.ProtocolWriter.Write(compositeFrame);
- }
- }
-
-
- /// <summary>
- /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- /// maximum frame size.
- /// </summary>
- /// <param name="payload"></param>
- /// <returns>return the array of content bodies</returns>
- private ContentBody[] CreateContentBodies(ByteBuffer payload)
- {
- if ( payload == null )
- {
- return null;
- } else if ( payload.Remaining == 0 )
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
- int frameCount = CalculateContentBodyFrames(payload);
- ContentBody[] bodies = new ContentBody[frameCount];
- for ( int i = 0; i < frameCount; i++ )
- {
- int length = (payload.Remaining >= framePayloadMax)
- ? framePayloadMax : payload.Remaining;
- bodies[i] = new ContentBody(payload, (uint)length);
- }
- return bodies;
- }
-
- private int CalculateContentBodyFrames(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account
- // for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ( (payload == null) || (payload.Remaining == 0) )
- {
- frameCount = 0;
- } else
- {
- int dataLength = payload.Remaining;
- int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
- #endregion // Message Publishing
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client
+{
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ /// </summary>
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ public IChannel Channel { get {return _channel;}}
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _deliveryMode;
+ }
+ set
+ {
+ CheckNotClosed();
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
+ CheckNotClosed();
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
+ {
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ deliveryMode,
+ priority,
+ (uint)timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ mandatory,
+ _immediate
+ );
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
+ {
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
+ }
+ _timeToLive = value;
+ }
+ }
+
+ #endregion
+
+ public string MimeType
+ {
+ get
+ {
+ CheckNotClosed();
+ return _mimeType;
+ }
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ get
+ {
+ CheckNotClosed();
+ return _encoding;
+ }
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #region Message Publishing
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ if (timeToLive != 0)
+ {
+ message.Expiration = message.Timestamp + timeToLive;
+ }
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index 34b47137e5..4b38b1a293 100644
--- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -1,694 +1,699 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using log4net;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Buffer;
-
-namespace Apache.Qpid.Client.Message
-{
- public abstract class AbstractQmsMessage : AMQMessage, IMessage
- {
- private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
-
- protected bool _redelivered;
-
- protected ByteBuffer _data;
- protected bool _readableMessage = false;
- private QpidHeaders _headers;
-
- protected AbstractQmsMessage(ByteBuffer data)
- : base(new BasicContentHeaderProperties())
- {
- Init(data);
- }
-
- protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
- : this(contentHeader, deliveryTag)
- {
- Init(data);
- }
-
- protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
- {
- Init(null);
- }
-
- private void Init(ByteBuffer data)
- {
- _data = data;
- if ( _data != null )
- {
- _data.Acquire();
- }
- _readableMessage = (data != null);
- if ( ContentHeaderProperties.Headers == null )
- ContentHeaderProperties.Headers = new FieldTable();
- _headers = new QpidHeaders(ContentHeaderProperties.Headers);
- }
-
- //
- // Properties
- //
-
- /// <summary>
- /// The application message identifier
- /// </summary>
- public string MessageId
- {
- get
- {
- if (ContentHeaderProperties.MessageId == null)
- {
- ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
- }
- return ContentHeaderProperties.MessageId;
- }
- set { ContentHeaderProperties.MessageId = value; }
- }
-
- /// <summary>
- /// The message timestamp
- /// </summary>
- public long Timestamp
- {
- get
- {
- // TODO: look at ulong/long choice
- return (long) ContentHeaderProperties.Timestamp;
- }
- set
- {
- ContentHeaderProperties.Timestamp = (ulong) value;
- }
- }
-
- /// <summary>
- /// The <see cref="CorrelationId"/> as a byte array.
- /// </summary>
- public byte[] CorrelationIdAsBytes
- {
- get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
- set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
- }
-
- /// <summary>
- /// The application correlation identifier
- /// </summary>
- public string CorrelationId
- {
- get { return ContentHeaderProperties.CorrelationId; }
- set { ContentHeaderProperties.CorrelationId = value; }
- }
-
- struct Dest
- {
- public string ExchangeName;
- public string RoutingKey;
-
- public Dest(string exchangeName, string routingKey)
- {
- ExchangeName = exchangeName;
- RoutingKey = routingKey;
- }
- }
-
- /// <summary>
- /// Exchange name of the reply-to address
- /// </summary>
- public string ReplyToExchangeName
- {
- get
- {
- return ReadReplyToHeader().ExchangeName;
- }
- set
- {
- BindingURL dest = ReadReplyToHeader();
- dest.ExchangeName = value;
- WriteReplyToHeader(dest);
- }
- }
-
- /// <summary>
- /// Routing key of the reply-to address
- /// </summary>
- public string ReplyToRoutingKey
- {
- get
- {
- return ReadReplyToHeader().RoutingKey;
- }
- set
- {
- BindingURL dest = ReadReplyToHeader();
- dest.RoutingKey = value;
- WriteReplyToHeader(dest);
- }
- }
-
- /// <summary>
- /// Non-persistent (1) or persistent (2)
- /// </summary>
- public DeliveryMode DeliveryMode
- {
- get
- {
- byte b = ContentHeaderProperties.DeliveryMode;
- switch (b)
- {
- case 1:
- return DeliveryMode.NonPersistent;
- case 2:
- return DeliveryMode.Persistent;
- default:
- throw new QpidException("Illegal value for delivery mode in content header properties");
- }
- }
- set
- {
- ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
- }
- }
-
- /// <summary>
- /// True, if this is a redelivered message
- /// </summary>
- public bool Redelivered
- {
- get { return _redelivered; }
- set { _redelivered = value; }
- }
-
- /// <summary>
- /// The message type name
- /// </summary>
- public string Type
- {
- get { return ContentHeaderProperties.Type; }
- set { ContentHeaderProperties.Type = value; }
- }
-
- /// <summary>
- /// Message expiration specification
- /// </summary>
- public long Expiration
- {
- get { return ContentHeaderProperties.Expiration; }
- set { ContentHeaderProperties.Expiration = value; }
- }
-
- /// <summary>
- /// The message priority, 0 to 9
- /// </summary>
- public byte Priority
- {
- get { return ContentHeaderProperties.Priority; }
- set { ContentHeaderProperties.Priority = (byte) value; }
- }
-
- /// <summary>
- /// The MIME Content Type
- /// </summary>
- public string ContentType
- {
- get { return ContentHeaderProperties.ContentType; }
- set { ContentHeaderProperties.ContentType = value; }
- }
-
- /// <summary>
- /// The MIME Content Encoding
- /// </summary>
- public string ContentEncoding
- {
- get { return ContentHeaderProperties.Encoding; }
- set { ContentHeaderProperties.Encoding = value; }
- }
-
- /// <summary>
- /// Headers of this message
- /// </summary>
- public IHeaders Headers
- {
- get { return _headers; }
- }
-
- /// <summary>
- /// The creating user id
- /// </summary>
- public string UserId
- {
- get { return ContentHeaderProperties.UserId; }
- set { ContentHeaderProperties.UserId = value; }
- }
-
- /// <summary>
- /// The creating application id
- /// </summary>
- public string AppId
- {
- get { return ContentHeaderProperties.AppId; }
- set { ContentHeaderProperties.AppId = value; }
- }
-
- /// <summary>
- /// Intra-cluster routing identifier
- /// </summary>
- public string ClusterId
- {
- get { return ContentHeaderProperties.ClusterId; }
- set { ContentHeaderProperties.ClusterId = value; }
- }
-
- /// <summary>
- /// Return the raw byte array that is used to populate the frame when sending
- /// the message.
- /// </summary>
- /// <value>a byte array of message data</value>
- public ByteBuffer Data
- {
- get
- {
- if (_data != null)
- {
- if (!_readableMessage)
- {
- _data.Flip();
- }
- else
- {
- // Make sure we rewind the data just in case any method has moved the
- // position beyond the start.
- _data.Rewind();
- }
- }
- return _data;
- }
-
- set
- {
- _data = value;
- }
- }
-
- public void Acknowledge()
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_channel != null)
- {
- // we set multiple to true here since acknowledgement implies acknowledge of all count messages
- // received on the session
- _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
- }
-
- }
-
- public abstract void ClearBodyImpl();
-
- public void ClearBody()
- {
- ClearBodyImpl();
- _readableMessage = false;
- }
-
- /// <summary>
- /// Get a String representation of the body of the message. Used in the
- /// toString() method which outputs this before message properties.
- /// </summary>
- /// <exception cref="QpidException"></exception>
- public abstract string ToBodyString();
-
- public override string ToString()
- {
- try
- {
- StringBuilder buf = new StringBuilder("Body:\n");
- buf.Append(ToBodyString());
- buf.Append("\nQmsTimestamp: ").Append(Timestamp);
- buf.Append("\nQmsExpiration: ").Append(Expiration);
- buf.Append("\nQmsPriority: ").Append(Priority);
- buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
- buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
- buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
- buf.Append("\nAMQ message number: ").Append(DeliveryTag);
- buf.Append("\nProperties:");
- if (ContentHeaderProperties.Headers == null)
- {
- buf.Append("<NONE>");
- }
- else
- {
- buf.Append(Headers.ToString());
- }
- return buf.ToString();
- }
- catch (Exception e)
- {
- return e.ToString();
- }
- }
-
- public FieldTable PopulateHeadersFromMessageProperties()
- {
- if (ContentHeaderProperties.Headers == null)
- {
- return null;
- }
- else
- {
- //
- // We need to convert every property into a String representation
- // Note that type information is preserved in the property name
- //
- FieldTable table = new FieldTable();
- foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
- {
- string propertyName = (string) entry.Key;
- if (propertyName == null)
- {
- continue;
- }
- else
- {
- table[propertyName] = entry.Value.ToString();
- }
- }
- return table;
- }
- }
-
- public BasicContentHeaderProperties ContentHeaderProperties
- {
- get
- {
- return (BasicContentHeaderProperties) _contentHeaderProperties;
- }
- }
-
- protected virtual void Reset()
- {
- _readableMessage = true;
- }
-
- public bool IsReadable
- {
- get { return _readableMessage; }
- }
-
- public bool isWritable
- {
- get { return !_readableMessage; }
- }
-
- protected void CheckReadable()
- {
- if ( !_readableMessage )
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
-
- /// <summary>
- /// Decodes the replyto field if one is set.
- ///
- /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
- /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
- /// empty the replyto field is expected to being with ':'.
- ///
- /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
- /// </summary>
- ///
- /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
- private BindingURL ReadReplyToHeader()
- {
- string replyToEncoding = ContentHeaderProperties.ReplyTo;
- //log.Debug("replyToEncoding = " + replyToEncoding);
-
- BindingURL bindingUrl = new BindingURL(replyToEncoding);
- //log.Debug("bindingUrl = " + bindingUrl.ToString());
-
- return bindingUrl;
-
- //log.Info("replyToEncoding = " + replyToEncoding);
-
-// if ( replyToEncoding == null )
-// {
-// return new Dest();
-// } else
-// {
-// // Split the replyto field on a ':'
-// string[] split = replyToEncoding.Split(':');
-
-// // Ensure that the replyto field argument only consisted of two parts.
-// if ( split.Length != 2 )
-// {
-// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
-// }
-
-// // Extract the exchange name and routing key from the split replyto field.
-// string exchangeName = split[0];
-
-// string[] split2 = split[1].Split('/');
-// string routingKey = split2[3];
-
-// return new Dest(exchangeName, routingKey);
-// }
- }
-
- private void WriteReplyToHeader(BindingURL dest)
- {
- string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
- ContentHeaderProperties.ReplyTo = encodedDestination;
- }
- }
-
- public class BindingURL
- {
- public readonly static string OPTION_EXCLUSIVE = "exclusive";
- public readonly static string OPTION_AUTODELETE = "autodelete";
- public readonly static string OPTION_DURABLE = "durable";
- public readonly static string OPTION_CLIENTID = "clientid";
- public readonly static string OPTION_SUBSCRIPTION = "subscription";
- public readonly static string OPTION_ROUTING_KEY = "routingkey";
-
- /// <summary> Holds the undecoded URL </summary>
- string url;
-
- /// <summary> Holds the decoded options. </summary>
- IDictionary options = new Hashtable();
-
- /// <summary> Holds the decoded exchange class. </summary>
- string exchangeClass;
-
- /// <summary> Holds the decoded exchange name. </summary>
- string exchangeName;
-
- /// <summary> Holds the destination name. </summary>
- string destination;
-
- /// <summary> Holds the decoded queue name. </summary>
- string queueName;
-
- /// <summary>
- /// The binding URL has the format:
- /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- /// </summary>
- public BindingURL(string url)
- {
- this.url = url;
- Parse();
- }
-
- public string Url { get { return url; } }
-
- public string ExchangeClass
- {
- get { return exchangeClass; }
- set { exchangeClass = value; }
- }
-
- public string ExchangeName
- {
- get { return exchangeName; }
- set { exchangeName = value; }
- }
-
- public string QueueName
- {
- get { return queueName; }
- set { queueName = value; }
- }
-
- public string DestinationName
- {
- get { return destination; }
- set { destination = value; }
- }
-
- public string RoutingKey {
- get { return (string)options[OPTION_ROUTING_KEY]; }
- set { options[OPTION_ROUTING_KEY] = value; }
- }
-
- public bool ContainsOption(string key) { return options.Contains(key); }
-
- public string ToString()
- {
- return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
- ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
- }
-
- private void Parse()
- {
- Uri binding = new Uri(url);
-
- // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
- string exchangeClass = binding.Scheme;
-
- if (exchangeClass == null)
- {
- url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
- Parse();
-
- return;
- }
- else
- {
- this.exchangeClass = exchangeClass;
- }
-
- // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
- string exchangeName = binding.Host;
-
- if (exchangeName == null)
- {
- if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
- {
- this.exchangeName = "";
- }
- }
- else
- {
- this.exchangeName = exchangeName;
- }
-
- // Extract the destination and queue name.
- if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
- {
- throw new UriFormatException("Destination or Queue required");
- }
- else
- {
- int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
- if (slashOffset == -1)
- {
- throw new UriFormatException("Destination required");
- }
- else
- {
- String path = binding.AbsolutePath;
-
- this.destination = path.Substring(1, slashOffset - 1);
- this.queueName = path.Substring(slashOffset + 1);
- }
- }
-
- ParseOptions(options, binding.Query);
-
- // If the routing key is not set as an option, set it to the destination name.
- if (!ContainsOption(OPTION_ROUTING_KEY))
- {
- options[OPTION_ROUTING_KEY] = destination;
- }
- }
-
- /// <summary>
- /// options looks like this
- /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
- /// </summary>
- public static void ParseOptions(IDictionary optionMap, string options)
- {
- // Check that there really are some options to parse.
- if ((options == null) || (options.IndexOf('=') == -1))
- {
- return;
- }
-
- int optionIndex = options.IndexOf('=');
- string option = options.Substring(0, optionIndex);
- int length = options.Length;
- int nestedQuotes = 0;
-
- // Holds the index of the final "'".
- int valueIndex = optionIndex;
-
- // Loop over all the options.Dest
- while ((nestedQuotes > 0) || (valueIndex < length))
- {
- valueIndex++;
-
- if (valueIndex >= length)
- {
- break;
- }
-
- if (options[valueIndex] == '\'')
- {
- if ((valueIndex + 1) < options.Length)
- {
- if ((options[valueIndex + 1] == '&') ||
- (options[valueIndex + 1] == ',') ||
- (options[valueIndex + 1] == ';') ||
- (options[valueIndex + 1] == '\''))
- {
- nestedQuotes--;
-
- if (nestedQuotes == 0)
- {
- // We've found the value of an option
- break;
- }
- }
- else
- {
- nestedQuotes++;
- }
- }
- else
- {
- // We are at the end of the string
- // Check to see if we are corectly closing quotes
- if (options[valueIndex] == '\'')
- {
- nestedQuotes--;
- }
-
- break;
- }
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Buffer;
+
+namespace Apache.Qpid.Client.Message
+{
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
+ {
+ private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+
+ protected bool _redelivered;
+
+ protected ByteBuffer _data;
+ protected bool _readableMessage = false;
+ private QpidHeaders _headers;
+
+ protected AbstractQmsMessage(ByteBuffer data)
+ : base(new BasicContentHeaderProperties())
+ {
+ Init(data);
+ }
+
+ protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ : this(contentHeader, deliveryTag)
+ {
+ Init(data);
+ }
+
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+ {
+ Init(null);
+ }
+
+ private void Init(ByteBuffer data)
+ {
+ _data = data;
+ if ( _data != null )
+ {
+ _data.Acquire();
+ }
+ _readableMessage = (data != null);
+ if ( ContentHeaderProperties.Headers == null )
+ ContentHeaderProperties.Headers = new FieldTable();
+ _headers = new QpidHeaders(ContentHeaderProperties.Headers);
+ }
+
+ //
+ // Properties
+ //
+
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ public string MessageId
+ {
+ get
+ {
+ if (ContentHeaderProperties.MessageId == null)
+ {
+ ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
+ }
+ return ContentHeaderProperties.MessageId;
+ }
+ set { ContentHeaderProperties.MessageId = value; }
+ }
+
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
+ public long Timestamp
+ {
+ get
+ {
+ // TODO: look at ulong/long choice
+ return (long) ContentHeaderProperties.Timestamp;
+ }
+ set
+ {
+ ContentHeaderProperties.Timestamp = (ulong) value;
+ }
+ }
+
+ /// <summary>
+ /// The <see cref="CorrelationId"/> as a byte array.
+ /// </summary>
+ public byte[] CorrelationIdAsBytes
+ {
+ get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
+ set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
+ }
+
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ public string CorrelationId
+ {
+ get { return ContentHeaderProperties.CorrelationId; }
+ set { ContentHeaderProperties.CorrelationId = value; }
+ }
+
+ struct Dest
+ {
+ public string ExchangeName;
+ public string RoutingKey;
+
+ public Dest(string exchangeName, string routingKey)
+ {
+ ExchangeName = exchangeName;
+ RoutingKey = routingKey;
+ }
+ }
+
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
+ public string ReplyToExchangeName
+ {
+ get
+ {
+ return ReadReplyToHeader().ExchangeName;
+ }
+ set
+ {
+ BindingURL dest = ReadReplyToHeader();
+ dest.ExchangeName = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
+ public string ReplyToRoutingKey
+ {
+ get
+ {
+ return ReadReplyToHeader().RoutingKey;
+ }
+ set
+ {
+ BindingURL dest = ReadReplyToHeader();
+ dest.RoutingKey = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ byte b = ContentHeaderProperties.DeliveryMode;
+ switch (b)
+ {
+ case 1:
+ return DeliveryMode.NonPersistent;
+ case 2:
+ return DeliveryMode.Persistent;
+ default:
+ throw new QpidException("Illegal value for delivery mode in content header properties");
+ }
+ }
+ set
+ {
+ ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
+ }
+ }
+
+ /// <summary>
+ /// True, if this is a redelivered message
+ /// </summary>
+ public bool Redelivered
+ {
+ get { return _redelivered; }
+ set { _redelivered = value; }
+ }
+
+ /// <summary>
+ /// The message type name
+ /// </summary>
+ public string Type
+ {
+ get { return ContentHeaderProperties.Type; }
+ set { ContentHeaderProperties.Type = value; }
+ }
+
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ public long Expiration
+ {
+ get { return ContentHeaderProperties.Expiration; }
+ set { ContentHeaderProperties.Expiration = value; }
+ }
+
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ public byte Priority
+ {
+ get { return ContentHeaderProperties.Priority; }
+ set { ContentHeaderProperties.Priority = (byte) value; }
+ }
+
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ public string ContentType
+ {
+ get { return ContentHeaderProperties.ContentType; }
+ set { ContentHeaderProperties.ContentType = value; }
+ }
+
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ public string ContentEncoding
+ {
+ get { return ContentHeaderProperties.Encoding; }
+ set { ContentHeaderProperties.Encoding = value; }
+ }
+
+ /// <summary>
+ /// Headers of this message
+ /// </summary>
+ public IHeaders Headers
+ {
+ get { return _headers; }
+ }
+
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ public string UserId
+ {
+ get { return ContentHeaderProperties.UserId; }
+ set { ContentHeaderProperties.UserId = value; }
+ }
+
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ public string AppId
+ {
+ get { return ContentHeaderProperties.AppId; }
+ set { ContentHeaderProperties.AppId = value; }
+ }
+
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ public string ClusterId
+ {
+ get { return ContentHeaderProperties.ClusterId; }
+ set { ContentHeaderProperties.ClusterId = value; }
+ }
+
+ /// <summary>
+ /// Return the raw byte array that is used to populate the frame when sending
+ /// the message.
+ /// </summary>
+ /// <value>a byte array of message data</value>
+ public ByteBuffer Data
+ {
+ get
+ {
+ if (_data != null)
+ {
+ if (!_readableMessage)
+ {
+ _data.Flip();
+ }
+ else
+ {
+ // Make sure we rewind the data just in case any method has moved the
+ // position beyond the start.
+ _data.Rewind();
+ }
+ }
+ return _data;
+ }
+
+ set
+ {
+ _data = value;
+ }
+ }
+
+ public void Acknowledge()
+ {
+ // we set multiple to true here since acknowledgement implies acknowledge of all messages
+ // received on the session. That's a bit JMSy though.
+ Acknowledge(true);
+ }
+
+ public void Acknowledge(bool ackprevious)
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_channel != null)
+ {
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, ackprevious);
+ }
+
+ }
+
+ public abstract void ClearBodyImpl();
+
+ public void ClearBody()
+ {
+ ClearBodyImpl();
+ _readableMessage = false;
+ }
+
+ /// <summary>
+ /// Get a String representation of the body of the message. Used in the
+ /// toString() method which outputs this before message properties.
+ /// </summary>
+ /// <exception cref="QpidException"></exception>
+ public abstract string ToBodyString();
+
+ public override string ToString()
+ {
+ try
+ {
+ StringBuilder buf = new StringBuilder("Body:\n");
+ buf.Append(ToBodyString());
+ buf.Append("\nQmsTimestamp: ").Append(Timestamp);
+ buf.Append("\nQmsExpiration: ").Append(Expiration);
+ buf.Append("\nQmsPriority: ").Append(Priority);
+ buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
+ buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
+ buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
+ buf.Append("\nAMQ message number: ").Append(DeliveryTag);
+ buf.Append("\nProperties:");
+ if (ContentHeaderProperties.Headers == null)
+ {
+ buf.Append("<NONE>");
+ }
+ else
+ {
+ buf.Append(Headers.ToString());
+ }
+ return buf.ToString();
+ }
+ catch (Exception e)
+ {
+ return e.ToString();
+ }
+ }
+
+ public FieldTable PopulateHeadersFromMessageProperties()
+ {
+ if (ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ //
+ // We need to convert every property into a String representation
+ // Note that type information is preserved in the property name
+ //
+ FieldTable table = new FieldTable();
+ foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
+ {
+ string propertyName = (string) entry.Key;
+ if (propertyName == null)
+ {
+ continue;
+ }
+ else
+ {
+ table[propertyName] = entry.Value.ToString();
+ }
+ }
+ return table;
+ }
+ }
+
+ public BasicContentHeaderProperties ContentHeaderProperties
+ {
+ get
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+ }
+
+ protected virtual void Reset()
+ {
+ _readableMessage = true;
+ }
+
+ public bool IsReadable
+ {
+ get { return _readableMessage; }
+ }
+
+ public bool isWritable
+ {
+ get { return !_readableMessage; }
+ }
+
+ protected void CheckReadable()
+ {
+ if ( !_readableMessage )
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ /// <summary>
+ /// Decodes the replyto field if one is set.
+ ///
+ /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
+ /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
+ /// empty the replyto field is expected to being with ':'.
+ ///
+ /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
+ /// </summary>
+ ///
+ /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
+ private BindingURL ReadReplyToHeader()
+ {
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ //log.Debug("replyToEncoding = " + replyToEncoding);
+
+ BindingURL bindingUrl = new BindingURL(replyToEncoding);
+ //log.Debug("bindingUrl = " + bindingUrl.ToString());
+
+ return bindingUrl;
+
+ //log.Info("replyToEncoding = " + replyToEncoding);
+
+// if ( replyToEncoding == null )
+// {
+// return new Dest();
+// } else
+// {
+// // Split the replyto field on a ':'
+// string[] split = replyToEncoding.Split(':');
+
+// // Ensure that the replyto field argument only consisted of two parts.
+// if ( split.Length != 2 )
+// {
+// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+// }
+
+// // Extract the exchange name and routing key from the split replyto field.
+// string exchangeName = split[0];
+
+// string[] split2 = split[1].Split('/');
+// string routingKey = split2[3];
+
+// return new Dest(exchangeName, routingKey);
+// }
+ }
+
+ private void WriteReplyToHeader(BindingURL dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
+ }
+
+ public class BindingURL
+ {
+ public readonly static string OPTION_EXCLUSIVE = "exclusive";
+ public readonly static string OPTION_AUTODELETE = "autodelete";
+ public readonly static string OPTION_DURABLE = "durable";
+ public readonly static string OPTION_CLIENTID = "clientid";
+ public readonly static string OPTION_SUBSCRIPTION = "subscription";
+ public readonly static string OPTION_ROUTING_KEY = "routingkey";
+
+ /// <summary> Holds the undecoded URL </summary>
+ string url;
+
+ /// <summary> Holds the decoded options. </summary>
+ IDictionary options = new Hashtable();
+
+ /// <summary> Holds the decoded exchange class. </summary>
+ string exchangeClass;
+
+ /// <summary> Holds the decoded exchange name. </summary>
+ string exchangeName;
+
+ /// <summary> Holds the destination name. </summary>
+ string destination;
+
+ /// <summary> Holds the decoded queue name. </summary>
+ string queueName;
+
+ /// <summary>
+ /// The binding URL has the format:
+ /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ /// </summary>
+ public BindingURL(string url)
+ {
+ this.url = url;
+ Parse();
+ }
+
+ public string Url { get { return url; } }
+
+ public string ExchangeClass
+ {
+ get { return exchangeClass; }
+ set { exchangeClass = value; }
+ }
+
+ public string ExchangeName
+ {
+ get { return exchangeName; }
+ set { exchangeName = value; }
+ }
+
+ public string QueueName
+ {
+ get { return queueName; }
+ set { queueName = value; }
+ }
+
+ public string DestinationName
+ {
+ get { return destination; }
+ set { destination = value; }
+ }
+
+ public string RoutingKey {
+ get { return (string)options[OPTION_ROUTING_KEY]; }
+ set { options[OPTION_ROUTING_KEY] = value; }
+ }
+
+ public bool ContainsOption(string key) { return options.Contains(key); }
+
+ public string ToString()
+ {
+ return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
+ ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
+ }
+
+ private void Parse()
+ {
+ Uri binding = new Uri(url);
+
+ // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
+ string exchangeClass = binding.Scheme;
+
+ if (exchangeClass == null)
+ {
+ url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
+ Parse();
+
+ return;
+ }
+ else
+ {
+ this.exchangeClass = exchangeClass;
+ }
+
+ // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
+ string exchangeName = binding.Host;
+
+ if (exchangeName == null)
+ {
+ if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ this.exchangeName = "";
+ }
+ }
+ else
+ {
+ this.exchangeName = exchangeName;
+ }
+
+ // Extract the destination and queue name.
+ if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
+ {
+ throw new UriFormatException("Destination or Queue required");
+ }
+ else
+ {
+ int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
+ if (slashOffset == -1)
+ {
+ throw new UriFormatException("Destination required");
+ }
+ else
+ {
+ String path = binding.AbsolutePath;
+
+ this.destination = path.Substring(1, slashOffset - 1);
+ this.queueName = path.Substring(slashOffset + 1);
+ }
+ }
+
+ ParseOptions(options, binding.Query);
+
+ // If the routing key is not set as an option, set it to the destination name.
+ if (!ContainsOption(OPTION_ROUTING_KEY))
+ {
+ options[OPTION_ROUTING_KEY] = destination;
+ }
+ }
+
+ /// <summary>
+ /// options looks like this
+ /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
+ /// </summary>
+ public static void ParseOptions(IDictionary optionMap, string options)
+ {
+ // Check that there really are some options to parse.
+ if ((options == null) || (options.IndexOf('=') == -1))
+ {
+ return;
+ }
+
+ int optionIndex = options.IndexOf('=');
+ string option = options.Substring(0, optionIndex);
+ int length = options.Length;
+ int nestedQuotes = 0;
+
+ // Holds the index of the final "'".
+ int valueIndex = optionIndex;
+
+ // Loop over all the options.Dest
+ while ((nestedQuotes > 0) || (valueIndex < length))
+ {
+ valueIndex++;
+
+ if (valueIndex >= length)
+ {
+ break;
+ }
+
+ if (options[valueIndex] == '\'')
+ {
+ if ((valueIndex + 1) < options.Length)
+ {
+ if ((options[valueIndex + 1] == '&') ||
+ (options[valueIndex + 1] == ',') ||
+ (options[valueIndex + 1] == ';') ||
+ (options[valueIndex + 1] == '\''))
+ {
+ nestedQuotes--;
+
+ if (nestedQuotes == 0)
+ {
+ // We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options[valueIndex] == '\'')
+ {
+ nestedQuotes--;
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj b/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj
index d271b05526..a4d2cf0e00 100644
--- a/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj
+++ b/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj
@@ -47,6 +47,7 @@
<Compile Include="testcases\DurableSubscriptionTest.cs" />
<Compile Include="testcases\HeadersExchangeTest.cs" />
<Compile Include="testcases\MandatoryMessageTest.cs" />
+ <Compile Include="testcases\ClientAckTests.cs" />
<Compile Include="testcases\ProducerMultiConsumerTest.cs" />
<Compile Include="testcases\SslConnectionTest.cs" />
<Compile Include="testcases\SustainedTest.cs" />
@@ -63,4 +64,10 @@
<ItemGroup>
<Compile Include="interop\TestCases\TestCase2BasicP2P.cs" />
</ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+ <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+ <Name>Qpid.Client</Name>
+ </ProjectReference>
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
index 7859848233..c44c5e53d1 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
@@ -297,5 +297,14 @@ namespace Apache.Qpid.Integration.Tests.testcases
return buf.ToString();
}
+
+ protected void SendMessages(int count, IMessagePublisher pub)
+ {
+ for (int i = 0; i < count; i++)
+ {
+ pub.Send(pub.Channel.CreateTextMessage("Test message "+i));
+ }
+
+ }
}
}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs b/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs
new file mode 100755
index 0000000000..64b5e6c31a
--- /dev/null
+++ b/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Checks that byte messages can be produced and received properly.
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class ClientAckTests : BaseMessagingTestFixture
+ {
+ private static ILog log = LogManager.GetLogger(typeof(ClientAckTests));
+ private static string TEST_ROUTING_KEY = "MESSAGE_ACK_TEST_QUEUE";
+ private IMessage msgA;
+ private IMessage msgB;
+ private IMessage msgC;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+
+ // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ // Send 3 messages and get them back
+ SendMessages(3, testProducer[0]);
+ msgA = testConsumer[1].Receive();
+ msgB = testConsumer[1].Receive();
+ msgC = testConsumer[1].Receive();
+ }
+
+ [TearDown]
+ public override void Shutdown()
+ {
+ try
+ {
+ // Clean up after the test.
+ CloseEndPoint(0);
+ CloseEndPoint(1);
+ }
+ finally
+ {
+ base.Shutdown();
+ }
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and each one rolling acks up. </summary>
+ public void TestAckingABCAll()
+ {
+ msgA.Acknowledge();
+ msgB.Acknowledge();
+ msgC.Acknowledge();
+
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and ack each one individually </summary>
+ public void TestAckingABCIndividual()
+ {
+ msgA.Acknowledge(false);
+ msgB.Acknowledge(false);
+ msgC.Acknowledge(false);
+
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and the middle one only rolling acks up. </summary>
+ public void TestAckingBOnlyAll()
+ {
+ msgB.Acknowledge();
+
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ log.Debug("Checking we get the last message back");
+ ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and ack the middle one only individually. </summary>
+ public void TestAckingBOnlyIndividual()
+ {
+ msgB.Acknowledge(false);
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessages(1, "Test message 0", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and ack the last one, rolling acks up. </summary>
+ public void TestAckingCOnlyAll()
+ {
+ msgC.Acknowledge();
+
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and ack the last oneindivdually. </summary>
+ public void TestAckingCOnlyIndividual()
+ {
+ msgC.Acknowledge(false);
+
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessages(1, "Test message 0", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "Test message 1", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and the first two indivdually. </summary>
+ public void TestAckingAtoBIndivdual()
+ {
+ msgA.Acknowledge(false);
+ msgB.Acknowledge(false);
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+ }
+
+ [Test]
+ /// <summary> Send 3 messages, get them back and ack the first and last one indivdually. </summary>
+ public void TestAckingAandCIndivdual()
+ {
+ msgA.Acknowledge(false);
+ msgC.Acknowledge(false);
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, true, testId.ToString());
+ //((AmqChannel)testChannel[2]).Suspend(false);
+ ConsumeNMessagesOnly(1, "Test message 1", testConsumer[1]);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Messaging/IMessage.cs b/dotnet/Qpid.Messaging/IMessage.cs
index 20ae5ee130..1a40243fd6 100644
--- a/dotnet/Qpid.Messaging/IMessage.cs
+++ b/dotnet/Qpid.Messaging/IMessage.cs
@@ -1,97 +1,98 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace Apache.Qpid.Messaging
-{
- public interface IMessage
- {
- /// <summary>
- /// The MIME Content Type
- /// </summary>
- string ContentType { get; set;}
- /// <summary>
- /// The MIME Content Encoding
- /// </summary>
- string ContentEncoding { get; set; }
- /// <summary>
- /// The application correlation identifier
- /// </summary>
- string CorrelationId { get; set; }
- /// <summary>
- /// The application correlation identifier, as an array of bytes
- /// </summary>
- byte[] CorrelationIdAsBytes { get; set; }
- /// <summary>
- /// Non-persistent (1) or persistent (2)
- /// </summary>
- DeliveryMode DeliveryMode { get; set; }
- /// <summary>
- /// Message expiration specification
- /// </summary>
- long Expiration { get; set; }
- /// <summary>
- /// The application message identifier
- /// </summary>
- string MessageId { get; set; }
- /// <summary>
- /// The message priority, 0 to 9
- /// </summary>
- byte Priority { get; set; }
- /// <summary>
- /// True if the message has been redelivered
- /// </summary>
- bool Redelivered { get; set; }
- /// <summary>
- /// Exchange name of the reply-to address
- /// </summary>
- string ReplyToExchangeName { get; set; }
- /// <summary>
- /// Routing key of the reply-to address
- /// </summary>
- string ReplyToRoutingKey { get; set; }
- /// <summary>
- /// The message timestamp
- /// </summary>
- long Timestamp { get; set; }
- /// <summary>
- /// The message type name
- /// </summary>
- string Type { get; set; }
- /// <summary>
- /// Message headers
- /// </summary>
- IHeaders Headers { get; }
- /// <summary>
- /// The creating user id
- /// </summary>
- string UserId { get; set; }
- /// <summary>
- /// The creating application id
- /// </summary>
- string AppId { get; set; }
- /// <summary>
- /// Intra-cluster routing identifier
- /// </summary>
- string ClusterId { get; set; }
-
- void Acknowledge();
- void ClearBody();
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Messaging
+{
+ public interface IMessage
+ {
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ string ContentType { get; set;}
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ string ContentEncoding { get; set; }
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ string CorrelationId { get; set; }
+ /// <summary>
+ /// The application correlation identifier, as an array of bytes
+ /// </summary>
+ byte[] CorrelationIdAsBytes { get; set; }
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ DeliveryMode DeliveryMode { get; set; }
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ long Expiration { get; set; }
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ string MessageId { get; set; }
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ byte Priority { get; set; }
+ /// <summary>
+ /// True if the message has been redelivered
+ /// </summary>
+ bool Redelivered { get; set; }
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
+ string ReplyToExchangeName { get; set; }
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
+ string ReplyToRoutingKey { get; set; }
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
+ long Timestamp { get; set; }
+ /// <summary>
+ /// The message type name
+ /// </summary>
+ string Type { get; set; }
+ /// <summary>
+ /// Message headers
+ /// </summary>
+ IHeaders Headers { get; }
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ string UserId { get; set; }
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ string AppId { get; set; }
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ string ClusterId { get; set; }
+
+ void Acknowledge();
+ void Acknowledge(bool ackprevious);
+ void ClearBody();
+ }
+}
diff --git a/dotnet/Qpid.Messaging/IMessagePublisher.cs b/dotnet/Qpid.Messaging/IMessagePublisher.cs
index d895a9749b..f223211729 100644
--- a/dotnet/Qpid.Messaging/IMessagePublisher.cs
+++ b/dotnet/Qpid.Messaging/IMessagePublisher.cs
@@ -1,92 +1,97 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-
-namespace Apache.Qpid.Messaging
-{
- /// <summary>
- /// Defines an object capable of publishing messages
- /// to an AMQP broker.
- /// </summary>
- /// <remarks>
- /// A publisher can be created using either
- /// <see cref="IChannel.CreatePublisher"/> or
- /// using the builder pattern (preferred) with
- /// <see cref="IChannel.CreatePublisherBuilder"/>
- /// </remarks>
- public interface IMessagePublisher : IDisposable, ICloseable
- {
- /// <summary>
- /// Default delivery mode to use with this publisher
- /// </summary>
- DeliveryMode DeliveryMode { get; set; }
- /// <summary>
- /// Name of exchange messages are published to
- /// </summary>
- string ExchangeName { get; }
- /// <summary>
- /// Routing key used when publishing messages
- /// </summary>
- string RoutingKey { get; }
- /// <summary>
- /// If true, a message ID will not be generated by the publisher
- /// when sending the message
- /// </summary>
- bool DisableMessageID { get; set; }
- /// <summary>
- /// If true, no timestamp will be added to the message
- /// when publishing it
- /// </summary>
- bool DisableMessageTimestamp { get; set; }
- /// <summary>
- /// Default priority used when publishing messages
- /// </summary>
- int Priority { get; set; }
- /// <summary>
- /// Default time to live used when publishing messages
- /// </summary>
- long TimeToLive { get; set; }
- /// <summary>
- /// Set the default MIME type for messages produced by this producer.
- /// This reduces the overhead of each message.
- /// </summary>
- string MimeType { get; set; }
- /// <summary>
- /// Set the default encoding for messages produced by this producer.
- /// This reduces the overhead of each message.
- /// </summary>
- string Encoding { get; set; }
-
- /// <summary>
- /// Publish a message, using any default values configured
- /// </summary>
- /// <param name="msg">Message to publish</param>
- void Send(IMessage msg);
- /// <summary>
- /// Publish a message with the specified options
- /// </summary>
- /// <param name="msg">Message to publish</param>
- /// <param name="deliveryMode">Delivery mode to use</param>
- /// <param name="priority">Priority of the message</param>
- /// <param name="timeToLive">Time to live of the message</param>
- void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Apache.Qpid.Messaging
+{
+ /// <summary>
+ /// Defines an object capable of publishing messages
+ /// to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// A publisher can be created using either
+ /// <see cref="IChannel.CreatePublisher"/> or
+ /// using the builder pattern (preferred) with
+ /// <see cref="IChannel.CreatePublisherBuilder"/>
+ /// </remarks>
+ public interface IMessagePublisher : IDisposable, ICloseable
+ {
+ /// <summary>
+ /// Default delivery mode to use with this publisher
+ /// </summary>
+ DeliveryMode DeliveryMode { get; set; }
+ /// <summary>
+ /// Name of exchange messages are published to
+ /// </summary>
+ string ExchangeName { get; }
+ /// <summary>
+ /// Routing key used when publishing messages
+ /// </summary>
+ string RoutingKey { get; }
+ /// <summary>
+ /// If true, a message ID will not be generated by the publisher
+ /// when sending the message
+ /// </summary>
+ bool DisableMessageID { get; set; }
+ /// <summary>
+ /// If true, no timestamp will be added to the message
+ /// when publishing it
+ /// </summary>
+ bool DisableMessageTimestamp { get; set; }
+ /// <summary>
+ /// Default priority used when publishing messages
+ /// </summary>
+ int Priority { get; set; }
+ /// <summary>
+ /// Default time to live used when publishing messages
+ /// </summary>
+ long TimeToLive { get; set; }
+ /// <summary>
+ /// Set the default MIME type for messages produced by this producer.
+ /// This reduces the overhead of each message.
+ /// </summary>
+ string MimeType { get; set; }
+ /// <summary>
+ /// Set the default encoding for messages produced by this producer.
+ /// This reduces the overhead of each message.
+ /// </summary>
+ string Encoding { get; set; }
+
+ /// <summary>
+ /// Get the channel this producer is on
+ /// </summary>
+ IChannel Channel { get; }
+
+ /// <summary>
+ /// Publish a message, using any default values configured
+ /// </summary>
+ /// <param name="msg">Message to publish</param>
+ void Send(IMessage msg);
+ /// <summary>
+ /// Publish a message with the specified options
+ /// </summary>
+ /// <param name="msg">Message to publish</param>
+ /// <param name="deliveryMode">Delivery mode to use</param>
+ /// <param name="priority">Priority of the message</param>
+ /// <param name="timeToLive">Time to live of the message</param>
+ void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive);
+ }
+}