summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
-rw-r--r--M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs405
1 files changed, 0 insertions, 405 deletions
diff --git a/M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
deleted file mode 100644
index f33afc452e..0000000000
--- a/M4-RCs/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client
-{
- public class BasicMessageProducer : Closeable, IMessagePublisher
- {
- protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
- /// <summary>
- /// If true, messages will not get a timestamp.
- /// </summary>
- private bool _disableTimestamps;
-
- /// <summary>
- /// Priority of messages created by this producer.
- /// </summary>
- private int _messagePriority;
-
- /// <summary>
- /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- /// </summary>
- private long _timeToLive;
-
- /// <summary>
- /// Delivery mode used for this producer.
- /// </summary>
- private DeliveryMode _deliveryMode;
-
- private bool _immediate;
- private bool _mandatory;
-
- string _exchangeName;
- string _routingKey;
-
- /// <summary>
- /// Default encoding used for messages produced by this producer.
- /// </summary>
- private string _encoding;
-
- /// <summary>
- /// Default encoding used for message produced by this producer.
- /// </summary>
- private string _mimeType;
-
- /// <summary>
- /// True if this producer was created from a transacted session
- /// </summary>
- private bool _transacted;
-
- private ushort _channelId;
-
- /// <summary>
- /// This is an id generated by the session and is used to tie individual producers to the session. This means we
- /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
- /// to the session so that when an error is propagated to the session it can close the producer (meaning that
- /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- /// </summary>
- private long _producerId;
-
- /// <summary>
- /// The session used to create this producer
- /// </summary>
- private AmqChannel _channel;
-
- public BasicMessageProducer(string exchangeName, string routingKey,
- bool transacted,
- ushort channelId,
- AmqChannel channel,
- long producerId,
- DeliveryMode deliveryMode,
- long timeToLive,
- bool immediate,
- bool mandatory,
- int priority)
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _transacted = transacted;
- _channelId = channelId;
- _channel = channel;
- _producerId = producerId;
- _deliveryMode = deliveryMode;
- _timeToLive = timeToLive;
- _immediate = immediate;
- _mandatory = mandatory;
- _messagePriority = priority;
-
- _channel.RegisterProducer(producerId, this);
- }
-
-
- #region IMessagePublisher Members
-
- public DeliveryMode DeliveryMode
- {
- get
- {
- CheckNotClosed();
- return _deliveryMode;
- }
- set
- {
- CheckNotClosed();
- _deliveryMode = value;
- }
- }
-
- public string ExchangeName
- {
- get { return _exchangeName; }
- }
-
- public string RoutingKey
- {
- get { return _routingKey; }
- }
-
- public bool DisableMessageID
- {
- get
- {
- throw new Exception("The method or operation is not implemented.");
- }
- set
- {
- throw new Exception("The method or operation is not implemented.");
- }
- }
-
- public bool DisableMessageTimestamp
- {
- get
- {
- CheckNotClosed();
- return _disableTimestamps;
- }
- set
- {
- CheckNotClosed();
- _disableTimestamps = value;
- }
- }
-
- public int Priority
- {
- get
- {
- CheckNotClosed();
- return _messagePriority;
- }
- set
- {
- CheckNotClosed();
- if ( value < 0 || value > 9 )
- {
- throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = value;
- }
- }
-
- public override void Close()
- {
- _logger.Debug("Closing producer " + this);
- Interlocked.Exchange(ref _closed, CLOSED);
- _channel.DeregisterProducer(_producerId);
- }
-
- public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- deliveryMode,
- priority,
- (uint)timeToLive,
- _mandatory,
- _immediate
- );
- }
-
- public void Send(IMessage msg)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- _deliveryMode,
- _messagePriority,
- (uint)_timeToLive,
- _mandatory,
- _immediate
- );
- }
-
- // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
- // to facilitate publishing messages to potentially non-existent recipients.
- public void Send(IMessage msg, bool mandatory)
- {
- CheckNotClosed();
- SendImpl(
- _exchangeName,
- _routingKey,
- (AbstractQmsMessage)msg,
- _deliveryMode,
- _messagePriority,
- (uint)_timeToLive,
- mandatory,
- _immediate
- );
- }
-
- public long TimeToLive
- {
- get
- {
- CheckNotClosed();
- return _timeToLive;
- }
- set
- {
- CheckNotClosed();
- if ( value < 0 )
- {
- throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
- }
- _timeToLive = value;
- }
- }
-
- #endregion
-
- public string MimeType
- {
- get
- {
- CheckNotClosed();
- return _mimeType;
- }
- set
- {
- CheckNotClosed();
- _mimeType = value;
- }
- }
-
- public string Encoding
- {
- get
- {
- CheckNotClosed();
- return _encoding;
- }
- set
- {
- CheckNotClosed();
- _encoding = value;
- }
- }
-
- public void Dispose()
- {
- Close();
- }
-
- #region Message Publishing
-
- private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
- {
- // todo: handle session access ticket
- AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
- _channel.ChannelId, 0, exchangeName,
- routingKey, mandatory, immediate
- );
-
- // fix message properties
- if ( !_disableTimestamps )
- {
- message.Timestamp = DateTime.UtcNow.Ticks;
- if (timeToLive != 0)
- {
- message.Expiration = message.Timestamp + timeToLive;
- }
- } else
- {
- message.Expiration = 0;
- }
- message.DeliveryMode = deliveryMode;
- message.Priority = (byte)priority;
-
- ByteBuffer payload = message.Data;
- int payloadLength = payload.Limit;
-
- ContentBody[] contentBodies = CreateContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
- for ( int i = 0; i < contentBodies.Length; i++ )
- {
- frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
- }
- if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
- {
- _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
- _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
- message.ContentHeaderProperties, (uint)payloadLength
- );
- if ( _logger.IsDebugEnabled )
- {
- _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- lock ( _channel.Connection.FailoverMutex )
- {
- _channel.Connection.ProtocolWriter.Write(compositeFrame);
- }
- }
-
-
- /// <summary>
- /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- /// maximum frame size.
- /// </summary>
- /// <param name="payload"></param>
- /// <returns>return the array of content bodies</returns>
- private ContentBody[] CreateContentBodies(ByteBuffer payload)
- {
- if ( payload == null )
- {
- return null;
- } else if ( payload.Remaining == 0 )
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
- int frameCount = CalculateContentBodyFrames(payload);
- ContentBody[] bodies = new ContentBody[frameCount];
- for ( int i = 0; i < frameCount; i++ )
- {
- int length = (payload.Remaining >= framePayloadMax)
- ? framePayloadMax : payload.Remaining;
- bodies[i] = new ContentBody(payload, (uint)length);
- }
- return bodies;
- }
-
- private int CalculateContentBodyFrames(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account
- // for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ( (payload == null) || (payload.Remaining == 0) )
- {
- frameCount = 0;
- } else
- {
- int dataLength = payload.Remaining;
- int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
- #endregion // Message Publishing
- }
-}