summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-07 14:09:16 +0000
committerAidan Skinner <aidan@apache.org>2008-05-07 14:09:16 +0000
commit9003422eef0d6b53e0448bc8f4c1f445094a43d9 (patch)
treee00c460ce73539c9715c7c27f4250d5b4847829e /dotnet/Qpid.Client/Client
parent372720db97414ef06e7830f6fc7621e08fe17a67 (diff)
downloadqpid-python-9003422eef0d6b53e0448bc8f4c1f445094a43d9.tar.gz
Merged revisions 653420-654109 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line Check if consumer is closed and dont reclose it ........ r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line QPID-1022 Use synchronous writes to fix race conditions ........ r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line QPID-1023 increase some timeouts ........ r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness. ........ r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout. ........ r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line QPID-952 should have been part of previous commit ........ r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs36
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs42
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs6
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs14
4 files changed, 43 insertions, 55 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index 8e9ebdbdfb..41d4e089b6 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -69,7 +69,7 @@ namespace Apache.Qpid.Client
internal bool IsFailoverAllowed
{
- get { return _failoverPolicy.FailoverAllowed(); }
+ get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
}
/// <summary>
@@ -151,34 +151,22 @@ namespace Apache.Qpid.Client
_log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
// XXX: Should perhaps break out of the do/while here if not a SocketException...
}
- } while (_failoverPolicy.FailoverAllowed());
+ } while (!_connected && _failoverPolicy.FailoverAllowed());
_log.Debug("Are we connected:" + _connected);
-
- if (!_failoverPolicy.FailoverAllowed())
- {
- if ( lastException is AMQException )
- throw lastException;
- else
- throw new AMQConnectionException("Unable to connect", lastException);
- }
- // TODO: this needs to be redone so that we are not spinning.
- // A suitable object should be set that is then waited on
- // and only notified when a connection is made or when
- // the AMQConnection gets closed.
- while (!_connected && !Closed)
+ if (!_connected)
{
- _log.Debug("Sleeping.");
- Thread.Sleep(100);
- }
- if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
- {
- if (_lastAMQException != null)
- {
- throw _lastAMQException;
- }
+ if ( lastException is AMQException )
+ {
+ throw lastException;
+ }
+ else
+ {
+ throw new AMQConnectionException("Unable to connect", lastException);
+ }
}
+
}
/*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index ce8e2ca2fe..86dc9a4681 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -888,10 +888,14 @@ namespace Apache.Qpid.Client
/// <param name="consumer"></param>
private void RegisterConsumer(BasicMessageConsumer consumer)
{
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
- consumer.Exclusive, consumer.AcknowledgeMode);
- consumer.ConsumerTag = consumerTag;
- _consumers.Add(consumerTag, consumer);
+ consumer.Exclusive, consumer.AcknowledgeMode, tag);
+
}
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,21 @@ namespace Apache.Qpid.Client
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
- routingKey, true, args);
- _replayFrames.Add(queueBind);
+ routingKey, false, args);
+
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueBind);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, true, args));
}
- private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
{
- // Need to generate a consumer tag on the client so we can exploit the nowait flag.
- String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName, tag, noLocal,
@@ -934,9 +940,7 @@ namespace Apache.Qpid.Client
_logger.Debug(string.Format("DeleteQueue name={0}", queueName));
AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
-
- _replayFrames.Add(queueDelete);
-
+
if (noWait)
{
_connection.ProtocolWriter.Write(queueDelete);
@@ -945,6 +949,8 @@ namespace Apache.Qpid.Client
{
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
}
catch (AMQException)
{
@@ -958,14 +964,16 @@ namespace Apache.Qpid.Client
queueName, isDurable, isExclusive, isAutoDelete));
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
- isAutoDelete, true, null);
+ isAutoDelete, false, null);
- _replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueDeclare);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+ isAutoDelete, true, null));
}
// AMQP-level method.
@@ -978,8 +986,6 @@ namespace Apache.Qpid.Client
AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive,
durable, autoDelete, xinternal, noWait, args);
-
- _replayFrames.Add(declareExchange);
if (noWait)
{
@@ -987,6 +993,8 @@ namespace Apache.Qpid.Client
{
_connection.ProtocolWriter.Write(declareExchange);
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(declareExchange);
}
else
{
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index e88cf8f04c..6fee316cb4 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -266,7 +266,11 @@ namespace Apache.Qpid.Client
public override void Close()
{
- // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
+ if (_closed == CLOSED)
+ {
+ return;
+ }
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
index 7ae086e35f..1fb3d407eb 100644
--- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -35,12 +35,6 @@ namespace Apache.Qpid.Client.Protocol
private readonly IProtocolWriter _protocolWriter;
private readonly IConnectionCloser _connectionCloser;
- /**
- * Counter to ensure unique queue names
- */
- private int _queueId = 1;
- private readonly Object _queueIdLock = new Object();
-
/// <summary>
/// Maps from the channel id to the AmqChannel that it represents.
/// </summary>
@@ -267,13 +261,7 @@ namespace Apache.Qpid.Client.Protocol
internal string GenerateQueueName()
{
- int id;
- lock(_queueIdLock)
- {
- id = _queueId++;
- }
-
- return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;
+ return "ntmp_" + System.Guid.NewGuid();
}
}
}