summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/AmqChannel.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs153
1 files changed, 114 insertions, 39 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 9a8b9f787a..84f08729dd 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -23,14 +23,15 @@ using System.Collections;
using System.Text.RegularExpressions;
using System.Threading;
using log4net;
-using Qpid.Buffer;
-using Qpid.Client.Message;
-using Qpid.Collections;
-using Qpid.Framing;
-using Qpid.Messaging;
-using Qpid.Protocol;
-
-namespace Qpid.Client
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Util;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid.Client
{
public class AmqChannel : Closeable, IChannel
{
@@ -41,11 +42,14 @@ namespace Qpid.Client
private static int _nextSessionNumber = 0;
private int _sessionNumber;
+ private bool _suspended;
+ private object _suspensionLock = new object();
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
+ public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+ public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
private AMQConnection _connection;
@@ -55,9 +59,10 @@ namespace Qpid.Client
private ushort _channelId;
- private int _defaultPrefetch = DEFAULT_PREFETCH;
+ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
- private BlockingQueue _queue = new LinkedBlockingQueue();
+ private FlowControlQueue _queue;
private Dispatcher _dispatcher;
@@ -105,7 +110,7 @@ namespace Qpid.Client
{
UnprocessedMessage message;
- while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null)
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
{
//_queue.size()
DispatchMessage(message);
@@ -163,8 +168,9 @@ namespace Qpid.Client
/// <param name="channelId">The channel id.</param>
/// <param name="transacted">if set to <c>true</c> [transacted].</param>
/// <param name="acknowledgeMode">The acknowledge mode.</param>
- /// <param name="defaultPrefetch">Default prefetch value</param>
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+ /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
+ /// <param name="defaultPrefetchLow">Default prefetch low value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
: this()
{
_sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
@@ -178,8 +184,26 @@ namespace Qpid.Client
_acknowledgeMode = acknowledgeMode;
}
_channelId = channelId;
+ _defaultPrefetchHighMark = defaultPrefetchHigh;
+ _defaultPrefetchLowMark = defaultPrefetchLow;
+
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _queue = new FlowControlQueue(
+ _defaultPrefetchLowMark, _defaultPrefetchHighMark,
+ new ThresholdMethod(OnPrefetchLowMark),
+ new ThresholdMethod(OnPrefetchHighMark)
+ );
+ } else
+ {
+ // low and upper are the same
+ _queue = new FlowControlQueue(
+ _defaultPrefetchHighMark, _defaultPrefetchHighMark,
+ null, null
+ );
+ }
}
-
+
private AmqChannel()
{
_messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
@@ -269,19 +293,30 @@ namespace Qpid.Client
public void Rollback()
{
- // FIXME: Fail over safety. Needs FailoverSupport?
- CheckNotClosed();
- CheckTransacted(); // throws IllegalOperationException if not a transacted session
-
- try
- {
- _connection.ConvenientProtocolWriter.SyncWrite(
- TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
- }
- catch (AMQException e)
- {
- throw new QpidException("Failed to rollback", e);
- }
+ lock ( _suspensionLock )
+ {
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ try
+ {
+ bool suspended = IsSuspended;
+ if ( !suspended )
+ Suspend(true);
+
+ // todo: rollback dispatcher when TX support is added
+ //if ( _dispatcher != null )
+ // _dispatcher.Rollback();
+
+ _connection.ConvenientProtocolWriter.SyncWrite(
+ TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+
+ if ( !suspended )
+ Suspend(false);
+ } catch ( AMQException e )
+ {
+ throw new QpidException("Failed to rollback", e);
+ }
+ }
}
public override void Close()
@@ -539,21 +574,26 @@ namespace Qpid.Client
ReturnBouncedMessage(message);
} else
{
- _queue.EnqueueBlocking(message);
+ _queue.Enqueue(message);
}
}
public int DefaultPrefetch
{
- get
- {
- return _defaultPrefetch;
- }
- set
- {
- _defaultPrefetch = value;
- }
- }
+ get { return DefaultPrefetchHigh; }
+ }
+ public int DefaultPrefetchLow
+ {
+ get { return _defaultPrefetchLowMark; }
+ }
+ public int DefaultPrefetchHigh
+ {
+ get { return _defaultPrefetchHighMark; }
+ }
+ public bool IsSuspended
+ {
+ get { return _suspended; }
+ }
public ushort ChannelId
{
@@ -581,6 +621,7 @@ namespace Qpid.Client
internal void Stop()
{
+ Suspend(true);
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
@@ -883,7 +924,7 @@ namespace Qpid.Client
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
*/
- public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
+ internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
if (_logger.IsDebugEnabled)
@@ -930,5 +971,39 @@ namespace Qpid.Client
}
}
+
+ private void OnPrefetchLowMark(int count)
+ {
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
+ Suspend(false);
+ }
+ }
+ private void OnPrefetchHighMark(int count)
+ {
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
+ Suspend(true);
+ }
+ }
+
+ private void Suspend(bool suspend)
+ {
+ lock ( _suspensionLock )
+ {
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
+
+ _suspended = suspend;
+ AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
+
+ Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
+ }
+ }
+
}
}