summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs405
1 files changed, 405 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
new file mode 100644
index 0000000000..f33afc452e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -0,0 +1,405 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client
+{
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ /// </summary>
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _deliveryMode;
+ }
+ set
+ {
+ CheckNotClosed();
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
+ CheckNotClosed();
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
+ {
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ deliveryMode,
+ priority,
+ (uint)timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ mandatory,
+ _immediate
+ );
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
+ {
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
+ }
+ _timeToLive = value;
+ }
+ }
+
+ #endregion
+
+ public string MimeType
+ {
+ get
+ {
+ CheckNotClosed();
+ return _mimeType;
+ }
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ get
+ {
+ CheckNotClosed();
+ return _encoding;
+ }
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #region Message Publishing
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ if (timeToLive != 0)
+ {
+ message.Expiration = message.Timestamp + timeToLive;
+ }
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
+}