summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-05 12:29:15 +0000
committerAidan Skinner <aidan@apache.org>2008-05-05 12:29:15 +0000
commitcde3cb509e9def45b349438c72971d2f34fcca54 (patch)
treec4b7eb8ba4a90f6b6d96aa415ac44568a241261f
parent63653afbd2e8f6db2a3a9ff90707476bbb817ea1 (diff)
downloadqpid-python-cde3cb509e9def45b349438c72971d2f34fcca54.tar.gz
QPID-1022 Use synchronous writes to fix race conditions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@653451 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs22
1 files changed, 12 insertions, 10 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index ce8e2ca2fe..8093cfe951 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -888,10 +888,14 @@ namespace Apache.Qpid.Client
/// <param name="consumer"></param>
private void RegisterConsumer(BasicMessageConsumer consumer)
{
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
- consumer.Exclusive, consumer.AcknowledgeMode);
- consumer.ConsumerTag = consumerTag;
- _consumers.Add(consumerTag, consumer);
+ consumer.Exclusive, consumer.AcknowledgeMode, tag);
+
}
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,17 @@ namespace Apache.Qpid.Client
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
- routingKey, true, args);
+ routingKey, false, args);
_replayFrames.Add(queueBind);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueBind);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
}
}
- private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
{
- // Need to generate a consumer tag on the client so we can exploit the nowait flag.
- String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName, tag, noLocal,
@@ -958,13 +960,13 @@ namespace Apache.Qpid.Client
queueName, isDurable, isExclusive, isAutoDelete));
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
- isAutoDelete, true, null);
+ isAutoDelete, false, null);
_replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueDeclare);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
}
}