diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index ce8e2ca2fe..8093cfe951 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -888,10 +888,14 @@ namespace Apache.Qpid.Client /// <param name="consumer"></param> private void RegisterConsumer(BasicMessageConsumer consumer) { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + consumer.ConsumerTag = tag; + _consumers.Add(tag, consumer); + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; - _consumers.Add(consumerTag, consumer); + consumer.Exclusive, consumer.AcknowledgeMode, tag); + } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) @@ -902,19 +906,17 @@ namespace Apache.Qpid.Client AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, - routingKey, true, args); + routingKey, false, args); _replayFrames.Add(queueBind); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueBind); + _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, @@ -958,13 +960,13 @@ namespace Apache.Qpid.Client queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null); + isAutoDelete, false, null); _replayFrames.Add(queueDeclare); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueDeclare); + _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } } |