diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 153 |
1 files changed, 114 insertions, 39 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 9a8b9f787a..84f08729dd 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -23,14 +23,15 @@ using System.Collections; using System.Text.RegularExpressions; using System.Threading; using log4net; -using Qpid.Buffer; -using Qpid.Client.Message; -using Qpid.Collections; -using Qpid.Framing; -using Qpid.Messaging; -using Qpid.Protocol; - -namespace Qpid.Client +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Util; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client { public class AmqChannel : Closeable, IChannel { @@ -41,11 +42,14 @@ namespace Qpid.Client private static int _nextSessionNumber = 0; private int _sessionNumber; + private bool _suspended; + private object _suspensionLock = new object(); // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH; + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; private AMQConnection _connection; @@ -55,9 +59,10 @@ namespace Qpid.Client private ushort _channelId; - private int _defaultPrefetch = DEFAULT_PREFETCH; + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; - private BlockingQueue _queue = new LinkedBlockingQueue(); + private FlowControlQueue _queue; private Dispatcher _dispatcher; @@ -105,7 +110,7 @@ namespace Qpid.Client { UnprocessedMessage message; - while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null) + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) { //_queue.size() DispatchMessage(message); @@ -163,8 +168,9 @@ namespace Qpid.Client /// <param name="channelId">The channel id.</param> /// <param name="transacted">if set to <c>true</c> [transacted].</param> /// <param name="acknowledgeMode">The acknowledge mode.</param> - /// <param name="defaultPrefetch">Default prefetch value</param> - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) : this() { _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); @@ -178,8 +184,26 @@ namespace Qpid.Client _acknowledgeMode = acknowledgeMode; } _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _queue = new FlowControlQueue( + _defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark) + ); + } else + { + // low and upper are the same + _queue = new FlowControlQueue( + _defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null + ); + } } - + private AmqChannel() { _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); @@ -269,19 +293,30 @@ namespace Qpid.Client public void Rollback() { - // FIXME: Fail over safety. Needs FailoverSupport? - CheckNotClosed(); - CheckTransacted(); // throws IllegalOperationException if not a transacted session - - try - { - _connection.ConvenientProtocolWriter.SyncWrite( - TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); - } - catch (AMQException e) - { - throw new QpidException("Failed to rollback", e); - } + lock ( _suspensionLock ) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + bool suspended = IsSuspended; + if ( !suspended ) + Suspend(true); + + // todo: rollback dispatcher when TX support is added + //if ( _dispatcher != null ) + // _dispatcher.Rollback(); + + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + + if ( !suspended ) + Suspend(false); + } catch ( AMQException e ) + { + throw new QpidException("Failed to rollback", e); + } + } } public override void Close() @@ -539,21 +574,26 @@ namespace Qpid.Client ReturnBouncedMessage(message); } else { - _queue.EnqueueBlocking(message); + _queue.Enqueue(message); } } public int DefaultPrefetch { - get - { - return _defaultPrefetch; - } - set - { - _defaultPrefetch = value; - } - } + get { return DefaultPrefetchHigh; } + } + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } + public bool IsSuspended + { + get { return _suspended; } + } public ushort ChannelId { @@ -581,6 +621,7 @@ namespace Qpid.Client internal void Stop() { + Suspend(true); if (_dispatcher != null) { _dispatcher.StopDispatcher(); @@ -883,7 +924,7 @@ namespace Qpid.Client * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag */ - public void AcknowledgeMessage(ulong deliveryTag, bool multiple) + internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); if (_logger.IsDebugEnabled) @@ -930,5 +971,39 @@ namespace Qpid.Client } } + + private void OnPrefetchLowMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + private void OnPrefetchHighMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock ( _suspensionLock ) + { + if ( _logger.IsDebugEnabled ) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + } } |