diff options
author | Tomas Restrepo <tomasr@apache.org> | 2007-05-26 17:35:51 +0000 |
---|---|---|
committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-26 17:35:51 +0000 |
commit | 6bb17b1310dfd3c087a64c1d023b2fe467182248 (patch) | |
tree | 59b271fe5573dc132ec044a8898a9569b3696afa | |
parent | 2c5428300dbf8e0025b8e8cb010ff5e818c1a77e (diff) | |
download | qpid-python-6bb17b1310dfd3c087a64c1d023b2fe467182248.tar.gz |
QPID-136 Initial Prefetch Implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@541920 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 41 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 137 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs | 98 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Qpid.Client.csproj | 1 | ||||
-rw-r--r-- | dotnet/Qpid.Client/default.build | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Messaging/IChannel.cs | 19 | ||||
-rw-r--r-- | dotnet/Qpid.Messaging/IConnection.cs | 1 | ||||
-rw-r--r-- | dotnet/Qpid.Messaging/MessageConsumerBuilder.cs | 8 |
11 files changed, 257 insertions, 56 deletions
diff --git a/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs b/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs index 69f8cc1406..c776610596 100644 --- a/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs +++ b/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs @@ -56,7 +56,7 @@ namespace Qpid.Client.Tests { IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); _connection = new AMQConnection(connectionInfo); - _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 1); + _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 500, 300); } catch (QpidException e) { diff --git a/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs b/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs index 622c7c17c7..c90359d4d7 100644 --- a/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs +++ b/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs @@ -96,7 +96,7 @@ namespace Qpid.Client.Tests _publisher.Send(msg);
}
- _logger.Error("All messages sent");
+ _logger.Debug("All messages sent");
// receive all messages
for ( int i = 0; i < MESSAGE_COUNT; i++ )
{
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index 4498ba3a32..6feb72b54f 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -273,15 +273,17 @@ namespace Qpid.Client private bool _transacted; private AcknowledgeMode _acknowledgeMode; - int _prefetch; + int _prefetchHigh; + int _prefetchLow; AMQConnection _connection; - public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) { _connection = connection; _transacted = transacted; _acknowledgeMode = acknowledgeMode; - _prefetch = prefetch; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; } protected override object operation() @@ -297,14 +299,14 @@ namespace Qpid.Client // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. AmqChannel channel = new AmqChannel(_connection, - channelId, _transacted, _acknowledgeMode, _prefetch); + channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); _connection.ProtocolSession.AddSessionByChannel(channelId, channel); _connection.RegisterSession(channelId, channel); bool success = false; try { - _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted); + _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); success = true; } catch (AMQException e) @@ -334,11 +336,16 @@ namespace Qpid.Client public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) { - return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH); + return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); } public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) { + return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { CheckNotClosed(); if (ChannelLimitReached()) { @@ -347,7 +354,7 @@ namespace Qpid.Client else { CreateChannelFailoverSupport operation = - new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch); + new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); return (IChannel)operation.execute(this); } } @@ -774,18 +781,23 @@ namespace Qpid.Client foreach (AmqChannel channel in channels) { _protocolSession.AddSessionByChannel(channel.ChannelId, channel); - ReopenChannel(channel.ChannelId, (ushort)channel.DefaultPrefetch, channel.Transacted); + ReopenChannel( + channel.ChannelId, + channel.DefaultPrefetchHigh, + channel.DefaultPrefetchLow, + channel.Transacted + ); channel.ReplayOnFailOver(); } } - private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted) + private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { - _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}", - channelId, prefetch, transacted)); + _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", + channelId, prefetchHigh, prefetchLow, transacted)); try { - createChannelOverWire(channelId, prefetch, transacted); + CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); } catch (AMQException e) { @@ -795,7 +807,7 @@ namespace Qpid.Client } } - void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted) + void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); @@ -805,7 +817,8 @@ namespace Qpid.Client { // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. _protocolWriter.SyncWrite( - BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false), + BasicQosBody.CreateAMQFrame( + channelId, (uint)prefetchHigh, 0, false), typeof (BasicQosOkBody)); } diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 9a8b9f787a..c0a069abee 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -25,6 +25,7 @@ using System.Threading; using log4net; using Qpid.Buffer; using Qpid.Client.Message; +using Qpid.Client.Util; using Qpid.Collections; using Qpid.Framing; using Qpid.Messaging; @@ -41,11 +42,14 @@ namespace Qpid.Client private static int _nextSessionNumber = 0; private int _sessionNumber; + private bool _suspended; + private object _suspensionLock = new object(); // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH; + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; private AMQConnection _connection; @@ -55,9 +59,10 @@ namespace Qpid.Client private ushort _channelId; - private int _defaultPrefetch = DEFAULT_PREFETCH; + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; - private BlockingQueue _queue = new LinkedBlockingQueue(); + private FlowControlQueue _queue; private Dispatcher _dispatcher; @@ -105,7 +110,7 @@ namespace Qpid.Client { UnprocessedMessage message; - while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null) + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) { //_queue.size() DispatchMessage(message); @@ -163,8 +168,9 @@ namespace Qpid.Client /// <param name="channelId">The channel id.</param> /// <param name="transacted">if set to <c>true</c> [transacted].</param> /// <param name="acknowledgeMode">The acknowledge mode.</param> - /// <param name="defaultPrefetch">Default prefetch value</param> - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) : this() { _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); @@ -178,8 +184,26 @@ namespace Qpid.Client _acknowledgeMode = acknowledgeMode; } _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _queue = new FlowControlQueue( + _defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark) + ); + } else + { + // low and upper are the same + _queue = new FlowControlQueue( + _defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null + ); + } } - + private AmqChannel() { _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); @@ -269,19 +293,30 @@ namespace Qpid.Client public void Rollback() { - // FIXME: Fail over safety. Needs FailoverSupport? - CheckNotClosed(); - CheckTransacted(); // throws IllegalOperationException if not a transacted session - - try - { - _connection.ConvenientProtocolWriter.SyncWrite( - TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); - } - catch (AMQException e) - { - throw new QpidException("Failed to rollback", e); - } + lock ( _suspensionLock ) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + bool suspended = IsSuspended; + if ( !suspended ) + Suspend(true); + + // todo: rollback dispatcher when TX support is added + //if ( _dispatcher != null ) + // _dispatcher.Rollback(); + + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + + if ( !suspended ) + Suspend(false); + } catch ( AMQException e ) + { + throw new QpidException("Failed to rollback", e); + } + } } public override void Close() @@ -539,21 +574,26 @@ namespace Qpid.Client ReturnBouncedMessage(message); } else { - _queue.EnqueueBlocking(message); + _queue.Enqueue(message); } } public int DefaultPrefetch { - get - { - return _defaultPrefetch; - } - set - { - _defaultPrefetch = value; - } - } + get { return DefaultPrefetchHigh; } + } + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } + public bool IsSuspended + { + get { return _suspended; } + } public ushort ChannelId { @@ -581,6 +621,7 @@ namespace Qpid.Client internal void Stop() { + Suspend(true); if (_dispatcher != null) { _dispatcher.StopDispatcher(); @@ -883,7 +924,7 @@ namespace Qpid.Client * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag */ - public void AcknowledgeMessage(ulong deliveryTag, bool multiple) + internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); if (_logger.IsDebugEnabled) @@ -930,5 +971,39 @@ namespace Qpid.Client } } + + private void OnPrefetchLowMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + private void OnPrefetchHighMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock ( _suspensionLock ) + { + if ( _logger.IsDebugEnabled ) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + } } diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs index 7a28d7a85f..adedb41d8c 100644 --- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -326,7 +326,7 @@ namespace Qpid.Client.Message // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_channel != null) { - // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // we set multiple to true here since acknowledgement implies acknowledge of all count messages // received on the session _channel.AcknowledgeMessage((ulong)DeliveryTag, true); } diff --git a/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs new file mode 100644 index 0000000000..e8ffc145d3 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs @@ -0,0 +1,98 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Qpid.Collections;
+using Qpid.Common;
+
+namespace Qpid.Client.Util
+{
+ internal delegate void ThresholdMethod(int currentCount);
+
+ /// <summary>
+ /// Basic bounded queue used to implement prefetching.
+ /// Notice we do the callbacks here asynchronously to
+ /// avoid adding more complexity to the channel impl.
+ /// </summary>
+ internal class FlowControlQueue
+ {
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+ private int _itemCount;
+ private int _lowerBound;
+ private int _upperBound;
+ private ThresholdMethod _underThreshold;
+ private ThresholdMethod _overThreshold;
+
+ public FlowControlQueue(
+ int lowerBound,
+ int upperBound,
+ ThresholdMethod underThreshold,
+ ThresholdMethod overThreshold
+ )
+ {
+ _lowerBound = lowerBound;
+ _upperBound = upperBound;
+ _underThreshold = underThreshold;
+ _overThreshold = overThreshold;
+ }
+
+ public void Enqueue(object item)
+ {
+ _queue.EnqueueBlocking(item);
+ int count = Interlocked.Increment(ref _itemCount);
+ if ( _overThreshold != null )
+ {
+ if ( count == _upperBound )
+ {
+ _overThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _overThreshold
+ );
+ }
+ }
+ }
+
+ public object Dequeue()
+ {
+ object item = _queue.DequeueBlocking();
+ int count = Interlocked.Decrement(ref _itemCount);
+ if ( _underThreshold != null )
+ {
+ if ( count == _lowerBound )
+ {
+ _underThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _underThreshold
+ );
+ }
+ }
+ return item;
+ }
+
+ private void OnAsyncCallEnd(IAsyncResult res)
+ {
+ ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+ method.EndInvoke(res);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj index e7c05ffebc..0e92e8f990 100644 --- a/dotnet/Qpid.Client/Qpid.Client.csproj +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -120,6 +120,7 @@ <Compile Include="Client\Transport\Socket\Blocking\SslSocketConnector.cs" />
<Compile Include="Client\Transport\Socket\Blocking\SocketConnector.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ISocketConnector.cs" />
+ <Compile Include="Client\Util\FlowControlQueue.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="qms\BrokerInfo.cs" />
<Compile Include="qms\ConnectionInfo.cs" />
diff --git a/dotnet/Qpid.Client/default.build b/dotnet/Qpid.Client/default.build index 5d09b3c011..ac4c8bfd4e 100644 --- a/dotnet/Qpid.Client/default.build +++ b/dotnet/Qpid.Client/default.build @@ -23,7 +23,7 @@ <include name="${build.dir}/Qpid.Codec.dll" /> <include name="${build.dir}/Qpid.Common.dll" /> <include name="${build.dir}/Qpid.Messaging.dll" /> - <include name="${build.dir}/Org.Mentalis.Security.dll" /> + <include name="${build.dir}/Org.Mentalis.Security.dll" /> </references> </csc> </target> diff --git a/dotnet/Qpid.Messaging/IChannel.cs b/dotnet/Qpid.Messaging/IChannel.cs index 7ff1e4b82d..3d08d00eb8 100644 --- a/dotnet/Qpid.Messaging/IChannel.cs +++ b/dotnet/Qpid.Messaging/IChannel.cs @@ -43,10 +43,23 @@ namespace Qpid.Messaging bool Transacted { get; } /// <summary> - /// Prefetch value to be used as the default for consumers created on this channel. + /// Prefetch value to be used as the default for + /// consumers created on this channel. /// </summary> - int DefaultPrefetch { get; set; } - + int DefaultPrefetch { get; } + + /// <summary> + /// Prefetch low value to be used as the default for + /// consumers created on this channel. + /// </summary> + int DefaultPrefetchLow { get; } + + /// <summary> + /// Prefetch high value to be used as the default for + /// consumers created on this channel. + /// </summary> + int DefaultPrefetchHigh { get; } + /// <summary> /// Declare a new exchange /// </summary> diff --git a/dotnet/Qpid.Messaging/IConnection.cs b/dotnet/Qpid.Messaging/IConnection.cs index 25c3ae968d..2bf8b9a6c2 100644 --- a/dotnet/Qpid.Messaging/IConnection.cs +++ b/dotnet/Qpid.Messaging/IConnection.cs @@ -47,6 +47,7 @@ namespace Qpid.Messaging IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode); IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch); + IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow); void Start(); void Stop(); diff --git a/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs b/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs index 4166dd0137..7d426c384a 100644 --- a/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs +++ b/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs @@ -22,21 +22,21 @@ namespace Qpid.Messaging { public class MessageConsumerBuilder { - public const int DEFAULT_PREFETCH_HIGH = 5000; - private bool _noLocal = false; private bool _exclusive = false; private bool _durable = false; private string _subscriptionName = null; private IChannel _channel; private readonly string _queueName; - private int _prefetchLow = 2500; - private int _prefetchHigh = DEFAULT_PREFETCH_HIGH; + private int _prefetchLow; + private int _prefetchHigh; public MessageConsumerBuilder(IChannel channel, string queueName) { _channel = channel; _queueName = queueName; + _prefetchHigh = _channel.DefaultPrefetchHigh; + _prefetchLow = _channel.DefaultPrefetchLow; } public MessageConsumerBuilder WithPrefetchLow(int prefetchLow) |