summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-12-16 22:52:47 +0000
committerStephen D. Huston <shuston@apache.org>2009-12-16 22:52:47 +0000
commit844625eea43d326665dd2f30bd6072678d5dd40e (patch)
tree60e33e9ed57a84571b95bb4eb6cc4a12ccbfe49f
parent0da041b0966b6a9a613484de2c7e96b18c771a78 (diff)
downloadqpid-python-844625eea43d326665dd2f30bd6072678d5dd40e.tar.gz
Apply patches for QPID-2247.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@891464 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs43
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs20
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs2
3 files changed, 57 insertions, 8 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
index 542f1a00a8..5012c76d7e 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
@@ -32,7 +32,8 @@ namespace Apache.Qpid.Channel
AmqpChannelProperties channelProperties;
long maxBufferPoolSize;
bool shared;
- int prefetchLimit;
+ int prefetchLimit;
+ List<AmqpTransportChannel> openChannels;
internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
@@ -45,7 +46,7 @@ namespace Apache.Qpid.Channel
Collection<MessageEncodingBindingElement> messageEncoderBindingElements
= context.BindingParameters.FindAll<MessageEncodingBindingElement>();
- if(messageEncoderBindingElements.Count > 1)
+ if (messageEncoderBindingElements.Count > 1)
{
throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
}
@@ -57,6 +58,8 @@ namespace Apache.Qpid.Channel
{
this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
}
+
+ openChannels = new List<AmqpTransportChannel>();
}
@@ -93,8 +96,42 @@ namespace Apache.Qpid.Channel
protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
{
- return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+ AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+ lock (openChannels)
+ {
+ channel.Closed += new EventHandler(channel_Closed);
+ openChannels.Add(channel);
+ }
+ return (TChannel)(object) channel;
}
+ void channel_Closed(object sender, EventArgs e)
+ {
+ if (this.State != CommunicationState.Opened)
+ {
+ return;
+ }
+
+ lock (openChannels)
+ {
+ AmqpTransportChannel channel = (AmqpTransportChannel)sender;
+ if (openChannels.Contains(channel))
+ {
+ openChannels.Remove(channel);
+ }
+ }
+ }
+
+ protected override void OnClose(TimeSpan timeout)
+ {
+ base.OnClose(timeout);
+ lock (openChannels)
+ {
+ foreach (AmqpTransportChannel channel in openChannels)
+ {
+ channel.Close(timeout);
+ }
+ }
+ }
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
index 8894b68584..3d7801e7c6 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
@@ -130,15 +130,21 @@ namespace Apache.Qpid.Channel
protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
{
+ if (this.IsDisposed)
+ {
+ return null;
+ }
+
if (amqpTransportChannel == null)
{
+ // TODO: add timeout processing
amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties,
new EndpointAddress(uri), messageEncoderFactory.Encoder,
maxBufferPoolSize, this.shared, this.prefetchLimit);
- return (IInputChannel)(object) amqpTransportChannel;
+ return (IInputChannel)(object)amqpTransportChannel;
}
- // TODO: remove "max one channel" restriction, add timeout processing
+ // Singleton channel. Subsequent Accepts wait until the listener is closed
acceptWaitEvent.WaitOne();
return null;
}
@@ -155,7 +161,11 @@ namespace Apache.Qpid.Channel
protected override void OnClose(TimeSpan timeout)
{
- // TODO: (+ OnAbort)
+ if (amqpTransportChannel != null)
+ {
+ amqpTransportChannel.Close();
+ }
+ acceptWaitEvent.Set();
}
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
@@ -170,7 +180,9 @@ namespace Apache.Qpid.Channel
protected override void OnAbort()
{
- // TODO:
+ if (amqpTransportChannel != null)
+ amqpTransportChannel.Abort();
+ acceptWaitEvent.Set();
}
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
index 5924142046..a6f6ee6800 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
@@ -278,7 +278,6 @@ namespace Apache.Qpid.Channel
public bool TryReceive(TimeSpan timeout, out Message message)
{
- this.ThrowIfDisposedOrNotOpen();
AmqpMessage amqpMessage;
message = null;
@@ -379,6 +378,7 @@ namespace Apache.Qpid.Channel
protected override void OnAbort()
{
//// TODO: check for network-less qpid teardown or launch special thread
+ this.CloseEndPoint();
this.Cleanup();
}