diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs new file mode 100644 index 0000000000..f33afc452e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -0,0 +1,405 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Messaging; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client +{ + public class BasicMessageProducer : Closeable, IMessagePublisher + { + protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); + + /// <summary> + /// If true, messages will not get a timestamp. + /// </summary> + private bool _disableTimestamps; + + /// <summary> + /// Priority of messages created by this producer. + /// </summary> + private int _messagePriority; + + /// <summary> + /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + /// </summary> + private long _timeToLive; + + /// <summary> + /// Delivery mode used for this producer. + /// </summary> + private DeliveryMode _deliveryMode; + + private bool _immediate; + private bool _mandatory; + + string _exchangeName; + string _routingKey; + + /// <summary> + /// Default encoding used for messages produced by this producer. + /// </summary> + private string _encoding; + + /// <summary> + /// Default encoding used for message produced by this producer. + /// </summary> + private string _mimeType; + + /// <summary> + /// True if this producer was created from a transacted session + /// </summary> + private bool _transacted; + + private ushort _channelId; + + /// <summary> + /// This is an id generated by the session and is used to tie individual producers to the session. This means we + /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers + /// to the session so that when an error is propagated to the session it can close the producer (meaning that + /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + /// </summary> + private long _producerId; + + /// <summary> + /// The session used to create this producer + /// </summary> + private AmqChannel _channel; + + public BasicMessageProducer(string exchangeName, string routingKey, + bool transacted, + ushort channelId, + AmqChannel channel, + long producerId, + DeliveryMode deliveryMode, + long timeToLive, + bool immediate, + bool mandatory, + int priority) + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _transacted = transacted; + _channelId = channelId; + _channel = channel; + _producerId = producerId; + _deliveryMode = deliveryMode; + _timeToLive = timeToLive; + _immediate = immediate; + _mandatory = mandatory; + _messagePriority = priority; + + _channel.RegisterProducer(producerId, this); + } + + + #region IMessagePublisher Members + + public DeliveryMode DeliveryMode + { + get + { + CheckNotClosed(); + return _deliveryMode; + } + set + { + CheckNotClosed(); + _deliveryMode = value; + } + } + + public string ExchangeName + { + get { return _exchangeName; } + } + + public string RoutingKey + { + get { return _routingKey; } + } + + public bool DisableMessageID + { + get + { + throw new Exception("The method or operation is not implemented."); + } + set + { + throw new Exception("The method or operation is not implemented."); + } + } + + public bool DisableMessageTimestamp + { + get + { + CheckNotClosed(); + return _disableTimestamps; + } + set + { + CheckNotClosed(); + _disableTimestamps = value; + } + } + + public int Priority + { + get + { + CheckNotClosed(); + return _messagePriority; + } + set + { + CheckNotClosed(); + if ( value < 0 || value > 9 ) + { + throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = value; + } + } + + public override void Close() + { + _logger.Debug("Closing producer " + this); + Interlocked.Exchange(ref _closed, CLOSED); + _channel.DeregisterProducer(_producerId); + } + + public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + deliveryMode, + priority, + (uint)timeToLive, + _mandatory, + _immediate + ); + } + + public void Send(IMessage msg) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + _deliveryMode, + _messagePriority, + (uint)_timeToLive, + _mandatory, + _immediate + ); + } + + // This is a short-term hack (knowing that this code will be re-vamped sometime soon) + // to facilitate publishing messages to potentially non-existent recipients. + public void Send(IMessage msg, bool mandatory) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + _deliveryMode, + _messagePriority, + (uint)_timeToLive, + mandatory, + _immediate + ); + } + + public long TimeToLive + { + get + { + CheckNotClosed(); + return _timeToLive; + } + set + { + CheckNotClosed(); + if ( value < 0 ) + { + throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); + } + _timeToLive = value; + } + } + + #endregion + + public string MimeType + { + get + { + CheckNotClosed(); + return _mimeType; + } + set + { + CheckNotClosed(); + _mimeType = value; + } + } + + public string Encoding + { + get + { + CheckNotClosed(); + return _encoding; + } + set + { + CheckNotClosed(); + _encoding = value; + } + } + + public void Dispose() + { + Close(); + } + + #region Message Publishing + + private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) + { + // todo: handle session access ticket + AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame( + _channel.ChannelId, 0, exchangeName, + routingKey, mandatory, immediate + ); + + // fix message properties + if ( !_disableTimestamps ) + { + message.Timestamp = DateTime.UtcNow.Ticks; + if (timeToLive != 0) + { + message.Expiration = message.Timestamp + timeToLive; + } + } else + { + message.Expiration = 0; + } + message.DeliveryMode = deliveryMode; + message.Priority = (byte)priority; + + ByteBuffer payload = message.Data; + int payloadLength = payload.Limit; + + ContentBody[] contentBodies = CreateContentBodies(payload); + AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; + for ( int i = 0; i < contentBodies.Length; i++ ) + { + frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); + } + if ( contentBodies.Length > 0 && _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + // weight argument of zero indicates no child content headers, just bodies + AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame( + _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, + message.ContentHeaderProperties, (uint)payloadLength + ); + if ( _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + lock ( _channel.Connection.FailoverMutex ) + { + _channel.Connection.ProtocolWriter.Write(compositeFrame); + } + } + + + /// <summary> + /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated + /// maximum frame size. + /// </summary> + /// <param name="payload"></param> + /// <returns>return the array of content bodies</returns> + private ContentBody[] CreateContentBodies(ByteBuffer payload) + { + if ( payload == null ) + { + return null; + } else if ( payload.Remaining == 0 ) + { + return new ContentBody[0]; + } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1); + int frameCount = CalculateContentBodyFrames(payload); + ContentBody[] bodies = new ContentBody[frameCount]; + for ( int i = 0; i < frameCount; i++ ) + { + int length = (payload.Remaining >= framePayloadMax) + ? framePayloadMax : payload.Remaining; + bodies[i] = new ContentBody(payload, (uint)length); + } + return bodies; + } + + private int CalculateContentBodyFrames(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account + // for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if ( (payload == null) || (payload.Remaining == 0) ) + { + frameCount = 0; + } else + { + int dataLength = payload.Remaining; + int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; + frameCount = (int)(dataLength / framePayloadMax) + lastFrame; + } + + return frameCount; + } + #endregion // Message Publishing + } +} |