diff options
author | Aidan Skinner <aidan@apache.org> | 2008-05-05 12:29:15 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-05-05 12:29:15 +0000 |
commit | cde3cb509e9def45b349438c72971d2f34fcca54 (patch) | |
tree | c4b7eb8ba4a90f6b6d96aa415ac44568a241261f | |
parent | 63653afbd2e8f6db2a3a9ff90707476bbb817ea1 (diff) | |
download | qpid-python-cde3cb509e9def45b349438c72971d2f34fcca54.tar.gz |
QPID-1022 Use synchronous writes to fix race conditions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@653451 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index ce8e2ca2fe..8093cfe951 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -888,10 +888,14 @@ namespace Apache.Qpid.Client /// <param name="consumer"></param> private void RegisterConsumer(BasicMessageConsumer consumer) { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + consumer.ConsumerTag = tag; + _consumers.Add(tag, consumer); + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; - _consumers.Add(consumerTag, consumer); + consumer.Exclusive, consumer.AcknowledgeMode, tag); + } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) @@ -902,19 +906,17 @@ namespace Apache.Qpid.Client AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, - routingKey, true, args); + routingKey, false, args); _replayFrames.Add(queueBind); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueBind); + _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, @@ -958,13 +960,13 @@ namespace Apache.Qpid.Client queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null); + isAutoDelete, false, null); _replayFrames.Add(queueDeclare); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueDeclare); + _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } } |