diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs new file mode 100644 index 0000000000..9b27b00994 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -0,0 +1,154 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Collections.Generic; + using System.Collections.ObjectModel; + + class AmqpChannelFactory<TChannel> : ChannelFactoryBase<TChannel> + { + MessageEncoderFactory messageEncoderFactory; + AmqpTransportBindingElement bindingElement; + AmqpChannelProperties channelProperties; + long maxBufferPoolSize; + bool shared; + int prefetchLimit; + BindingContext bindingContext; + List<AmqpTransportChannel> openChannels; + + internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) + : base(context.Binding) + { + this.bindingElement = bindingElement; + this.bindingContext = context; + this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; + this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; + Collection<MessageEncodingBindingElement> messageEncoderBindingElements + = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); + + if (messageEncoderBindingElements.Count > 1) + { + throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); + } + else if (messageEncoderBindingElements.Count == 1) + { + this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); + } + else + { + this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); + } + + openChannels = new List<AmqpTransportChannel>(); + } + + + public override T GetProperty<T>() + { + T mep = messageEncoderFactory.Encoder.GetProperty<T>(); + if (mep != null) + { + return mep; + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)messageEncoderFactory.Encoder.MessageVersion; + } + + return base.GetProperty<T>(); + } + + protected override void OnOpen(TimeSpan timeout) + { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelFactory OnBeginOpen"); + //// return null; + } + + protected override void OnEndOpen(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelFactory OnEndOpen"); + } + + protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) + { + AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); + lock (openChannels) + { + channel.Closed += new EventHandler(channel_Closed); + openChannels.Add(channel); + } + return (TChannel)(object) channel; + } + + void channel_Closed(object sender, EventArgs e) + { + if (this.State != CommunicationState.Opened) + { + return; + } + + lock (openChannels) + { + AmqpTransportChannel channel = (AmqpTransportChannel)sender; + if (openChannels.Contains(channel)) + { + openChannels.Remove(channel); + } + } + } + + protected override void OnClose(TimeSpan timeout) + { + base.OnClose(timeout); + lock (openChannels) + { + foreach (AmqpTransportChannel channel in openChannels) + { + channel.Close(timeout); + } + } + } + } +} |