summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
committerRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
commit32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch)
tree2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
parent116d91ad7825a98af36a869fc751206fbce0c59f (diff)
parentf7e896076143de4572b4f1f67ef0765125f2498d (diff)
downloadqpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs154
1 files changed, 154 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
new file mode 100644
index 0000000000..9b27b00994
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
@@ -0,0 +1,154 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Collections.Generic;
+ using System.Collections.ObjectModel;
+
+ class AmqpChannelFactory<TChannel> : ChannelFactoryBase<TChannel>
+ {
+ MessageEncoderFactory messageEncoderFactory;
+ AmqpTransportBindingElement bindingElement;
+ AmqpChannelProperties channelProperties;
+ long maxBufferPoolSize;
+ bool shared;
+ int prefetchLimit;
+ BindingContext bindingContext;
+ List<AmqpTransportChannel> openChannels;
+
+ internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
+ : base(context.Binding)
+ {
+ this.bindingElement = bindingElement;
+ this.bindingContext = context;
+ this.channelProperties = bindingElement.ChannelProperties.Clone();
+ this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
+ this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
+ Collection<MessageEncodingBindingElement> messageEncoderBindingElements
+ = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
+
+ if (messageEncoderBindingElements.Count > 1)
+ {
+ throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
+ }
+ else if (messageEncoderBindingElements.Count == 1)
+ {
+ this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory();
+ }
+ else
+ {
+ this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
+ }
+
+ openChannels = new List<AmqpTransportChannel>();
+ }
+
+
+ public override T GetProperty<T>()
+ {
+ T mep = messageEncoderFactory.Encoder.GetProperty<T>();
+ if (mep != null)
+ {
+ return mep;
+ }
+
+ if (typeof(T) == typeof(MessageVersion))
+ {
+ return (T)(object)messageEncoderFactory.Encoder.MessageVersion;
+ }
+
+ return base.GetProperty<T>();
+ }
+
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ // check and freeze security properties now
+ AmqpSecurityMode mode = AmqpSecurityMode.None;
+ if (this.bindingElement.BindingSecurity != null)
+ {
+ mode = bindingElement.BindingSecurity.Mode;
+ }
+
+ this.channelProperties.AmqpSecurityMode = mode;
+ if (mode == AmqpSecurityMode.None)
+ {
+ return;
+ }
+
+ AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext);
+ }
+
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ throw new NotImplementedException("AmqpChannelFactory OnBeginOpen");
+ //// return null;
+ }
+
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ throw new NotImplementedException("AmqpChannelFactory OnEndOpen");
+ }
+
+ protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
+ {
+ AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+ lock (openChannels)
+ {
+ channel.Closed += new EventHandler(channel_Closed);
+ openChannels.Add(channel);
+ }
+ return (TChannel)(object) channel;
+ }
+
+ void channel_Closed(object sender, EventArgs e)
+ {
+ if (this.State != CommunicationState.Opened)
+ {
+ return;
+ }
+
+ lock (openChannels)
+ {
+ AmqpTransportChannel channel = (AmqpTransportChannel)sender;
+ if (openChannels.Contains(channel))
+ {
+ openChannels.Remove(channel);
+ }
+ }
+ }
+
+ protected override void OnClose(TimeSpan timeout)
+ {
+ base.OnClose(timeout);
+ lock (openChannels)
+ {
+ foreach (AmqpTransportChannel channel in openChannels)
+ {
+ channel.Close(timeout);
+ }
+ }
+ }
+ }
+}