diff options
author | Aidan Skinner <aidan@apache.org> | 2008-05-07 14:09:16 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-05-07 14:09:16 +0000 |
commit | 9003422eef0d6b53e0448bc8f4c1f445094a43d9 (patch) | |
tree | e00c460ce73539c9715c7c27f4250d5b4847829e /dotnet/Qpid.Client/Client | |
parent | 372720db97414ef06e7830f6fc7621e08fe17a67 (diff) | |
download | qpid-python-9003422eef0d6b53e0448bc8f4c1f445094a43d9.tar.gz |
Merged revisions 653420-654109 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line
Check if consumer is closed and dont reclose it
........
r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line
QPID-1022 Use synchronous writes to fix race conditions
........
r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line
QPID-1023 increase some timeouts
........
r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines
QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness.
........
r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines
QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.
........
r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line
QPID-952 should have been part of previous commit
........
r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines
QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
-rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 36 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 42 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 6 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs | 14 |
4 files changed, 43 insertions, 55 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index 8e9ebdbdfb..41d4e089b6 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -69,7 +69,7 @@ namespace Apache.Qpid.Client internal bool IsFailoverAllowed { - get { return _failoverPolicy.FailoverAllowed(); } + get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } } /// <summary> @@ -151,34 +151,22 @@ namespace Apache.Qpid.Client _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); // XXX: Should perhaps break out of the do/while here if not a SocketException... } - } while (_failoverPolicy.FailoverAllowed()); + } while (!_connected && _failoverPolicy.FailoverAllowed()); _log.Debug("Are we connected:" + _connected); - - if (!_failoverPolicy.FailoverAllowed()) - { - if ( lastException is AMQException ) - throw lastException; - else - throw new AMQConnectionException("Unable to connect", lastException); - } - // TODO: this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !Closed) + if (!_connected) { - _log.Debug("Sleeping."); - Thread.Sleep(100); - } - if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } + if ( lastException is AMQException ) + { + throw lastException; + } + else + { + throw new AMQConnectionException("Unable to connect", lastException); + } } + } /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index ce8e2ca2fe..86dc9a4681 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -888,10 +888,14 @@ namespace Apache.Qpid.Client /// <param name="consumer"></param> private void RegisterConsumer(BasicMessageConsumer consumer) { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + consumer.ConsumerTag = tag; + _consumers.Add(tag, consumer); + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; - _consumers.Add(consumerTag, consumer); + consumer.Exclusive, consumer.AcknowledgeMode, tag); + } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) @@ -902,19 +906,21 @@ namespace Apache.Qpid.Client AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, - routingKey, true, args); - _replayFrames.Add(queueBind); + routingKey, false, args); + lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueBind); + _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args)); } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, @@ -934,9 +940,7 @@ namespace Apache.Qpid.Client _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); - - _replayFrames.Add(queueDelete); - + if (noWait) { _connection.ProtocolWriter.Write(queueDelete); @@ -945,6 +949,8 @@ namespace Apache.Qpid.Client { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); } catch (AMQException) { @@ -958,14 +964,16 @@ namespace Apache.Qpid.Client queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null); + isAutoDelete, false, null); - _replayFrames.Add(queueDeclare); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueDeclare); + _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, + isAutoDelete, true, null)); } // AMQP-level method. @@ -978,8 +986,6 @@ namespace Apache.Qpid.Client AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); - - _replayFrames.Add(declareExchange); if (noWait) { @@ -987,6 +993,8 @@ namespace Apache.Qpid.Client { _connection.ProtocolWriter.Write(declareExchange); } + // AS FIXME: wasnae me + _replayFrames.Add(declareExchange); } else { diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index e88cf8f04c..6fee316cb4 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -266,7 +266,11 @@ namespace Apache.Qpid.Client public override void Close() { - // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex + if (_closed == CLOSED) + { + return; + } + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs index 7ae086e35f..1fb3d407eb 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -35,12 +35,6 @@ namespace Apache.Qpid.Client.Protocol private readonly IProtocolWriter _protocolWriter; private readonly IConnectionCloser _connectionCloser; - /** - * Counter to ensure unique queue names - */ - private int _queueId = 1; - private readonly Object _queueIdLock = new Object(); - /// <summary> /// Maps from the channel id to the AmqChannel that it represents. /// </summary> @@ -267,13 +261,7 @@ namespace Apache.Qpid.Client.Protocol internal string GenerateQueueName() { - int id; - lock(_queueIdLock) - { - id = _queueId++; - } - - return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; + return "ntmp_" + System.Guid.NewGuid(); } } } |