diff options
Diffstat (limited to 'qpid/wcf')
3 files changed, 57 insertions, 8 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs index 542f1a00a8..5012c76d7e 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -32,7 +32,8 @@ namespace Apache.Qpid.Channel AmqpChannelProperties channelProperties; long maxBufferPoolSize; bool shared; - int prefetchLimit; + int prefetchLimit; + List<AmqpTransportChannel> openChannels; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) @@ -45,7 +46,7 @@ namespace Apache.Qpid.Channel Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); - if(messageEncoderBindingElements.Count > 1) + if (messageEncoderBindingElements.Count > 1) { throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); } @@ -57,6 +58,8 @@ namespace Apache.Qpid.Channel { this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); } + + openChannels = new List<AmqpTransportChannel>(); } @@ -93,8 +96,42 @@ namespace Apache.Qpid.Channel protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) { - return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); + AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); + lock (openChannels) + { + channel.Closed += new EventHandler(channel_Closed); + openChannels.Add(channel); + } + return (TChannel)(object) channel; } + void channel_Closed(object sender, EventArgs e) + { + if (this.State != CommunicationState.Opened) + { + return; + } + + lock (openChannels) + { + AmqpTransportChannel channel = (AmqpTransportChannel)sender; + if (openChannels.Contains(channel)) + { + openChannels.Remove(channel); + } + } + } + + protected override void OnClose(TimeSpan timeout) + { + base.OnClose(timeout); + lock (openChannels) + { + foreach (AmqpTransportChannel channel in openChannels) + { + channel.Close(timeout); + } + } + } } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs index 8894b68584..3d7801e7c6 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -130,15 +130,21 @@ namespace Apache.Qpid.Channel protected override IInputChannel OnAcceptChannel(TimeSpan timeout) { + if (this.IsDisposed) + { + return null; + } + if (amqpTransportChannel == null) { + // TODO: add timeout processing amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, new EndpointAddress(uri), messageEncoderFactory.Encoder, maxBufferPoolSize, this.shared, this.prefetchLimit); - return (IInputChannel)(object) amqpTransportChannel; + return (IInputChannel)(object)amqpTransportChannel; } - // TODO: remove "max one channel" restriction, add timeout processing + // Singleton channel. Subsequent Accepts wait until the listener is closed acceptWaitEvent.WaitOne(); return null; } @@ -155,7 +161,11 @@ namespace Apache.Qpid.Channel protected override void OnClose(TimeSpan timeout) { - // TODO: (+ OnAbort) + if (amqpTransportChannel != null) + { + amqpTransportChannel.Close(); + } + acceptWaitEvent.Set(); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) @@ -170,7 +180,9 @@ namespace Apache.Qpid.Channel protected override void OnAbort() { - // TODO: + if (amqpTransportChannel != null) + amqpTransportChannel.Abort(); + acceptWaitEvent.Set(); } } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs index 5924142046..a6f6ee6800 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -278,7 +278,6 @@ namespace Apache.Qpid.Channel public bool TryReceive(TimeSpan timeout, out Message message) { - this.ThrowIfDisposedOrNotOpen(); AmqpMessage amqpMessage; message = null; @@ -379,6 +378,7 @@ namespace Apache.Qpid.Channel protected override void OnAbort() { //// TODO: check for network-less qpid teardown or launch special thread + this.CloseEndPoint(); this.Cleanup(); } |