diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel')
26 files changed, 4312 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs new file mode 100644 index 0000000000..d533fc212e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + + using Apache.Qpid.AmqpTypes; + + public class AmqpBinaryBinding : AmqpBinding + { + public AmqpBinaryBinding() +: base (new RawMessageEncodingBindingElement()) + { + } + + public AmqpBinaryBinding(string configurationName) + : this() + { + ApplyConfiguration(configurationName); + } + + private void ApplyConfiguration(string configurationName) + { + BindingsSection wcfBindings = (BindingsSection)ConfigurationManager.GetSection("system.serviceModel/bindings"); + // wcfBindings contains system defined bindings and bindingExtensions + + AmqpBinaryBindingCollectionElement section = (AmqpBinaryBindingCollectionElement)wcfBindings["amqpBinaryBinding"]; + if (section == null) + { + throw new ConfigurationErrorsException("Missing \"amqpBinaryBinding\" configuration section."); + } + + AmqpBinaryBindingConfigurationElement element = section.Bindings[configurationName]; + if (element == null) + { + throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, + "There is no binding named {0} at {1}.", configurationName, section.BindingName)); + } + else + { + element.ApplyConfiguration(this); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs new file mode 100644 index 0000000000..de263bc4ef --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// <summary> + /// Implement application configuration of bindingExtensions for AmqpBinaryBinding + /// </summary> + public class AmqpBinaryBindingCollectionElement + : System.ServiceModel.Configuration.StandardBindingCollectionElement<AmqpBinaryBinding, AmqpBinaryBindingConfigurationElement> + { + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs new file mode 100644 index 0000000000..a537a6c6c3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs @@ -0,0 +1,79 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + using Apache.Qpid.AmqpTypes; + + public class AmqpBinaryBindingConfigurationElement : AmqpBindingConfigurationElement + { + public AmqpBinaryBindingConfigurationElement(string configurationName) + : base(configurationName) + { + } + + public AmqpBinaryBindingConfigurationElement() + : this(null) + { + } + + protected override Type BindingElementType + { + get { return typeof(AmqpBinaryBinding); } + } + + protected override ConfigurationPropertyCollection Properties + { + get + { + ConfigurationPropertyCollection properties = base.Properties; + + return properties; + } + } + + protected override void InitializeFrom(Binding binding) + { + base.InitializeFrom(binding); + AmqpBinaryBinding amqpBinding = (AmqpBinaryBinding)binding; + } + + protected override void OnApplyConfiguration(Binding binding) + { + if (binding == null) + throw new ArgumentNullException("binding"); + + if (binding.GetType() != typeof(AmqpBinaryBinding)) + { + throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", + typeof(AmqpBinaryBinding).AssemblyQualifiedName, + binding.GetType().AssemblyQualifiedName)); + } + + base.OnApplyConfiguration(binding); + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs new file mode 100644 index 0000000000..be54f06b2f --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -0,0 +1,153 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + + using Apache.Qpid.AmqpTypes; + + public class AmqpBinding : Binding + { + protected AmqpTransportBindingElement transport; + protected MessageEncodingBindingElement encoding; + protected AmqpSecurity security; + + public AmqpBinding() + : this(new BinaryMessageEncodingBindingElement()) + { + } + + protected AmqpBinding(MessageEncodingBindingElement encoding) + { + this.encoding = encoding; + transport = new AmqpTransportBindingElement(); + } + + public AmqpBinding(string configurationName) + : this() + { + ApplyConfiguration(configurationName); + } + + public string BrokerHost + { + get { return transport.BrokerHost; } + set { transport.BrokerHost = value; } + } + + public int BrokerPort + { + get { return transport.BrokerPort; } + set { transport.BrokerPort = value; } + } + + public int PrefetchLimit + { + get { return transport.PrefetchLimit; } + set { transport.PrefetchLimit = value; } + } + + public AmqpSecurity Security + { + get + { + if (security == null) + { + if (transport.ChannelProperties.AmqpTransportSecurity == null) + { + transport.ChannelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + security = new AmqpSecurity(transport.ChannelProperties.AmqpTransportSecurity); + transport.BindingSecurity = security; + } + + return security; + } + } + + internal bool SecurityEnabled + { + get { return (transport.ChannelProperties.AmqpSecurityMode != AmqpSecurityMode.None); } + } + + public bool Shared + { + get { return transport.Shared; } + set { transport.Shared = value; } + } + + public TransferMode TransferMode + { + get { return transport.TransferMode; } + set { transport.TransferMode = value; } + } + + public AmqpProperties DefaultMessageProperties + { + get { return transport.DefaultMessageProperties; } + set { transport.DefaultMessageProperties = value; } + } + + public override string Scheme + { + get { return AmqpConstants.Scheme; } + } + + public override BindingElementCollection CreateBindingElements() + { + BindingElementCollection bindingElements = new BindingElementCollection(); + + bindingElements.Add(encoding); + bindingElements.Add(transport); + + return bindingElements.Clone(); + } + + private void ApplyConfiguration(string configurationName) + { + BindingsSection wcfBindings = (BindingsSection)ConfigurationManager.GetSection("system.serviceModel/bindings"); + // wcfBindings contains system defined bindings and bindingExtensions + + AmqpBindingCollectionElement section = (AmqpBindingCollectionElement)wcfBindings["amqpBinding"]; + if (section == null) + { + throw new ConfigurationErrorsException("Missing \"amqpBinding\" configuration section."); + } + + AmqpBindingConfigurationElement element = section.Bindings[configurationName]; + if (element == null) + { + throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, + "There is no binding named {0} at {1}.", configurationName, section.BindingName)); + } + else + { + element.ApplyConfiguration(this); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs new file mode 100644 index 0000000000..e8d3b6fad4 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// <summary> + /// Implement application configuration of bindingExtensions for AmqpBinding + /// </summary> + public class AmqpBindingCollectionElement + : System.ServiceModel.Configuration.StandardBindingCollectionElement<AmqpBinding, AmqpBindingConfigurationElement> + { + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs new file mode 100644 index 0000000000..edc91e67c1 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs @@ -0,0 +1,344 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + using System.Threading; + using Apache.Qpid.AmqpTypes; + + public class AmqpBindingConfigurationElement : StandardBindingElement + { + // not regular config elements. See PostDeserialize + string brokerHost; + int brokerPort; + + public AmqpBindingConfigurationElement(string configurationName) + : base(configurationName) + { + brokerHost = AmqpDefaults.BrokerHost; + brokerPort = AmqpDefaults.BrokerPort; + } + + public AmqpBindingConfigurationElement() + : this(null) + { + } + + protected override Type BindingElementType + { + get { return typeof(AmqpBinding); } + } + + public string BrokerHost + { + get { return brokerHost; } + set { brokerHost = value; } + } + + public int BrokerPort + { + get { return brokerPort; } + set { brokerPort = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)] + public int PrefetchLimit + { + get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; } + set { base[AmqpConfigurationStrings.PrefetchLimit] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] + public bool Shared + { + get { return (bool)base[AmqpConfigurationStrings.Shared]; } + set { base[AmqpConfigurationStrings.Shared] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.TransferMode, DefaultValue = AmqpDefaults.TransferMode)] + public TransferMode TransferMode + { + get { return (TransferMode)base[AmqpConfigurationStrings.TransferMode]; } + set { base[AmqpConfigurationStrings.TransferMode] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.Brokers)] + public BrokerCollection Brokers + { + get + { + return (BrokerCollection)base[AmqpConfigurationStrings.Brokers]; + } + set + { + base[AmqpConfigurationStrings.Brokers] = value; + } + } + + [ConfigurationProperty(AmqpConfigurationStrings.Security)] + public AmqpSecurityElement Security + { + get { return (AmqpSecurityElement)base[AmqpConfigurationStrings.Security]; } + set { base[AmqpConfigurationStrings.Security] = value; } + } + + protected override ConfigurationPropertyCollection Properties + { + get + { + ConfigurationPropertyCollection properties = base.Properties; + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, + typeof(int), 0, null, null, ConfigurationPropertyOptions.None)); + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, + typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, + typeof(TransferMode), AmqpDefaults.TransferMode, null, null, ConfigurationPropertyOptions.None)); + properties.Add(new ConfigurationProperty("brokers", typeof(BrokerCollection), null)); + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Security, typeof(AmqpSecurityElement), null)); + return properties; + } + } + + protected override void InitializeFrom(Binding binding) + { + base.InitializeFrom(binding); + AmqpBinding amqpBinding = (AmqpBinding)binding; + this.BrokerHost = amqpBinding.BrokerHost; + this.BrokerPort = amqpBinding.BrokerPort; + this.TransferMode = amqpBinding.TransferMode; + this.Shared = amqpBinding.Shared; + this.PrefetchLimit = amqpBinding.PrefetchLimit; + + if (!amqpBinding.SecurityEnabled) + { + this.Security = null; + } + else + { + if (this.Security == null) + { + this.Security = new AmqpSecurityElement(); + } + + AmqpTransportSecurity sec = amqpBinding.Security.Transport; + this.Security.Mode = AmqpSecurityMode.Transport; + if (this.Security.Transport == null) + { + this.Security.Transport = new AmqpTransportSecurityElement(); + } + + this.Security.Transport.CredentialType = sec.CredentialType; + this.Security.Transport.IgnoreEndpointCredentials = sec.IgnoreEndpointClientCredentials; + this.Security.Transport.UseSSL = sec.UseSSL; + if (sec.DefaultCredential == null) + { + + this.Security.Transport.DefaultCredential = null; + } + else + { + this.Security.Transport.DefaultCredential = new AmqpCredentialElement(); + this.Security.Transport.DefaultCredential.UserName = sec.DefaultCredential.UserName; + this.Security.Transport.DefaultCredential.Password = sec.DefaultCredential.Password; + } + } + + AmqpProperties props = amqpBinding.DefaultMessageProperties; + } + + protected override void OnApplyConfiguration(Binding binding) + { + if (binding == null) + throw new ArgumentNullException("binding"); + + if (!(binding is AmqpBinding)) + { + throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", + typeof(AmqpBinding).AssemblyQualifiedName, + binding.GetType().AssemblyQualifiedName)); + } + + AmqpBinding amqpBinding = (AmqpBinding)binding; + amqpBinding.BrokerHost = this.BrokerHost; + amqpBinding.BrokerPort = this.BrokerPort; + amqpBinding.TransferMode = this.TransferMode; + amqpBinding.Shared = this.Shared; + amqpBinding.PrefetchLimit = this.PrefetchLimit; + + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.Security != null) + { + mode = this.Security.Mode; + } + + if (mode == AmqpSecurityMode.None) + { + if (amqpBinding.SecurityEnabled) + { + amqpBinding.Security.Mode = AmqpSecurityMode.None; + } + } + else + { + amqpBinding.Security.Mode = AmqpSecurityMode.Transport; + amqpBinding.Security.Transport.CredentialType = this.Security.Transport.CredentialType; + amqpBinding.Security.Transport.IgnoreEndpointClientCredentials = this.Security.Transport.IgnoreEndpointCredentials; + amqpBinding.Security.Transport.UseSSL = this.Security.Transport.UseSSL; + if (this.Security.Transport.DefaultCredential != null) + { + amqpBinding.Security.Transport.DefaultCredential = new AmqpCredential( + this.Security.Transport.DefaultCredential.UserName, + this.Security.Transport.DefaultCredential.Password); + } + else + { + amqpBinding.Security.Transport.DefaultCredential = null; + } + } + } + + + protected override void PostDeserialize() + { + base.PostDeserialize(); + + BrokerCollection brokers = Brokers; + if (brokers != null) + { + if (brokers.Count > 0) + { + // just grab the first element until failover is supported + System.Collections.IEnumerator brokersEnum = brokers.GetEnumerator(); + // move to first element + brokersEnum.MoveNext(); + BrokerElement be = (BrokerElement)brokersEnum.Current; + this.BrokerHost = be.Host; + this.BrokerPort = be.Port; + } + } + } + } + + public class BrokerCollection : ConfigurationElementCollection + { + public BrokerCollection() + { + //this.AddElementName = "broker"; + } + + protected override ConfigurationElement CreateNewElement() + { + return new BrokerElement(); + } + + protected override void BaseAdd(ConfigurationElement element) + { + BrokerElement be = (BrokerElement)element; + if (this.BaseGet((Object)be.Key) != null) + { + throw new ConfigurationErrorsException("duplicate broker definition at line " + element.ElementInformation.LineNumber); + } + base.BaseAdd(element); + } + + protected override Object GetElementKey(ConfigurationElement element) + { + BrokerElement be = (BrokerElement) element; + return be.Key; + } + + protected override void PostDeserialize() + { + base.PostDeserialize(); + if (this.Count == 0) + { + throw new ArgumentException("Brokers collection requires at least one broker"); + } + if (this.Count > 1) + { + Console.WriteLine("Warning: multiple brokers not supported, selecting first instance"); + } + BrokerElement be = (BrokerElement)this.BaseGet(0); + } + + protected override string ElementName + { + get + { + return "broker"; + } + } + + public override ConfigurationElementCollectionType CollectionType + { + get + { + return ConfigurationElementCollectionType.BasicMap; + } + } + } + + public class BrokerElement : ConfigurationElement + { + string key; + + public BrokerElement() + { + Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, + typeof(string), AmqpDefaults.BrokerHost, null, null, ConfigurationPropertyOptions.None)); + Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, + typeof(int), AmqpDefaults.BrokerPort, null, null, ConfigurationPropertyOptions.None)); + + } + + [ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, DefaultValue = AmqpDefaults.BrokerHost)] + public string Host + { + get { return (string)base[AmqpConfigurationStrings.BrokerHost]; } + set { base[AmqpConfigurationStrings.BrokerHost] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, DefaultValue = AmqpDefaults.BrokerPort)] + public int Port + { + get { return (int)base[AmqpConfigurationStrings.BrokerPort]; } + set { base[AmqpConfigurationStrings.BrokerPort] = value; } + } + + public string Key + { + get + { + if (this.key == null) + { + this.key = this.Host + ':' + this.Port; + } + return this.key; + } + } + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs new file mode 100644 index 0000000000..9b27b00994 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -0,0 +1,154 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Collections.Generic; + using System.Collections.ObjectModel; + + class AmqpChannelFactory<TChannel> : ChannelFactoryBase<TChannel> + { + MessageEncoderFactory messageEncoderFactory; + AmqpTransportBindingElement bindingElement; + AmqpChannelProperties channelProperties; + long maxBufferPoolSize; + bool shared; + int prefetchLimit; + BindingContext bindingContext; + List<AmqpTransportChannel> openChannels; + + internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) + : base(context.Binding) + { + this.bindingElement = bindingElement; + this.bindingContext = context; + this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; + this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; + Collection<MessageEncodingBindingElement> messageEncoderBindingElements + = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); + + if (messageEncoderBindingElements.Count > 1) + { + throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); + } + else if (messageEncoderBindingElements.Count == 1) + { + this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); + } + else + { + this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); + } + + openChannels = new List<AmqpTransportChannel>(); + } + + + public override T GetProperty<T>() + { + T mep = messageEncoderFactory.Encoder.GetProperty<T>(); + if (mep != null) + { + return mep; + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)messageEncoderFactory.Encoder.MessageVersion; + } + + return base.GetProperty<T>(); + } + + protected override void OnOpen(TimeSpan timeout) + { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelFactory OnBeginOpen"); + //// return null; + } + + protected override void OnEndOpen(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelFactory OnEndOpen"); + } + + protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) + { + AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); + lock (openChannels) + { + channel.Closed += new EventHandler(channel_Closed); + openChannels.Add(channel); + } + return (TChannel)(object) channel; + } + + void channel_Closed(object sender, EventArgs e) + { + if (this.State != CommunicationState.Opened) + { + return; + } + + lock (openChannels) + { + AmqpTransportChannel channel = (AmqpTransportChannel)sender; + if (openChannels.Contains(channel)) + { + openChannels.Remove(channel); + } + } + } + + protected override void OnClose(TimeSpan timeout) + { + base.OnClose(timeout); + lock (openChannels) + { + foreach (AmqpTransportChannel channel in openChannels) + { + channel.Close(timeout); + } + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs new file mode 100644 index 0000000000..b431689c4d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -0,0 +1,234 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Net; + using System.Net.Sockets; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Globalization; + + using Apache.Qpid.AmqpTypes; + + /// <summary> + /// Collection of constants used by the Amqp Channel classes + /// </summary> + static class AmqpConstants + { + internal const string Scheme = "amqp"; + internal const string AmqpBindingSectionName = "system.serviceModel/bindings/amqpBinding"; + internal const string AmqpBinaryBindingSectionName = "system.serviceModel/bindings/amqpBinaryBinding"; + internal const string AmqpTransportSectionName = "amqpTransport"; + } + + static class AmqpConfigurationStrings + { + public const string BrokerHost = "host"; + public const string BrokerPort = "port"; + public const string TransferMode = "transferMode"; + public const string Brokers = "brokers"; + public const string Shared = "shared"; + public const string PrefetchLimit = "prefetchLimit"; + public const string MaxBufferPoolSize = "maxBufferPoolSize"; + public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; + public const string Security = "security"; + public const string SecurityMode = "mode"; + public const string SecurityTransport = "transport"; + public const string SecurityTransportCredentialType = "credentialType"; + public const string SecurityTransportUseSSL = "useSSL"; + public const string SecurityTransportDefaultCredential = "defaultCredential"; + public const string SecurityTransportIgnoreEndpointCredentials = "ignoreEndpointCredentials"; + public const string CredentialUserName = "userName"; + public const string CredentialPassword = "password"; + } + + static class AmqpDefaults + { + internal const string BrokerHost = "localhost"; + internal const int BrokerPort = 5672; + internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; + internal const long MaxBufferPoolSize = 64 * 1024; + internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; + } + + // parking spot for properties that may be shared by separate channels on a single AMQP connection + internal class AmqpChannelProperties + { + string brokerHost; + int brokerPort; + TransferMode transferMode; + AmqpProperties defaultMessageProperties; + AmqpSecurityMode amqpSecurityMode; + AmqpTransportSecurity amqpTransportSecurity; + AmqpCredential amqpCredential; + long maxBufferPoolSize; + int maxReceivedMessageSize; + + internal AmqpChannelProperties() + { + this.brokerHost = AmqpDefaults.BrokerHost; + this.brokerPort = AmqpDefaults.BrokerPort; + this.transferMode = AmqpDefaults.TransferMode; + this.defaultMessageProperties = null; + this.amqpSecurityMode = AmqpSecurityMode.None; + this.amqpTransportSecurity = null; + this.amqpCredential = null; + this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize; + this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize; + } + + public AmqpChannelProperties Clone() + { + AmqpChannelProperties props = (AmqpChannelProperties) this.MemberwiseClone(); + if (this.defaultMessageProperties != null) + { + props.defaultMessageProperties = this.defaultMessageProperties.Clone(); + } + + if (this.amqpTransportSecurity != null) + { + props.amqpTransportSecurity = this.amqpTransportSecurity.Clone(); + } + + if (this.amqpCredential != null) + { + this.amqpCredential = this.amqpCredential.Clone(); + } + + return props; + } + + internal string BrokerHost + { + get { return this.brokerHost; } + set { this.brokerHost = value; } + } + + internal int BrokerPort + { + get { return this.brokerPort; } + set { this.brokerPort = value; } + } + + internal TransferMode TransferMode + { + get { return this.transferMode; } + set { this.transferMode = value; } + } + + internal AmqpProperties DefaultMessageProperties + { + get { return this.defaultMessageProperties; } + set { this.defaultMessageProperties = value; } + } + + internal AmqpSecurityMode AmqpSecurityMode + { + get { return this.amqpSecurityMode; } + set { this.amqpSecurityMode = value; } + } + + internal AmqpTransportSecurity AmqpTransportSecurity + { + get { return this.amqpTransportSecurity; } + set { this.amqpTransportSecurity = value; } + } + + internal AmqpCredential AmqpCredential + { + get { return this.amqpCredential; } + set { this.amqpCredential = value; } + } + + internal long MaxBufferPoolSize + { + get { return this.maxBufferPoolSize; } + set { this.maxBufferPoolSize = value; } + } + + internal int MaxReceivedMessageSize + { + get { return this.maxReceivedMessageSize; } + set { this.maxReceivedMessageSize = value; } + } + } + + static class AmqpChannelHelpers + { + internal static void ValidateTimeout(TimeSpan timeout) + { + if (timeout < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue."); + } + } + + internal static void FindAuthenticationCredentials(AmqpChannelProperties channelProperties, + BindingContext bindingContext) + { + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec == null) + { + // no auth + return; + } + + if (tsec.CredentialType == AmqpCredentialType.Anonymous) + { + // no auth + return; + } + + // credentials search order: specific AmqpCredentials, specific + // ClientCredentials (if applicable), binding's default credentials + + AmqpCredential amqpCred = bindingContext.BindingParameters.Find<AmqpCredential>(); + if (amqpCred != null) + { + channelProperties.AmqpCredential = amqpCred.Clone(); + return; + } + + if (!tsec.IgnoreEndpointClientCredentials) + { + ClientCredentials cliCred = bindingContext.BindingParameters.Find<ClientCredentials>(); + if (cliCred != null) + { + if (cliCred.UserName != null) + { + if (cliCred.UserName.UserName != null) + { + channelProperties.AmqpCredential = new AmqpCredential(cliCred.UserName.UserName, + cliCred.UserName.Password); + return; + } + } + } + } + + if (tsec.DefaultCredential != null) + { + channelProperties.AmqpCredential = tsec.DefaultCredential.Clone(); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs new file mode 100644 index 0000000000..78655f2124 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -0,0 +1,204 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + using System.Collections.Generic; + using System.Collections.ObjectModel; + + class AmqpChannelListener : ChannelListenerBase<IInputChannel> + { + MessageEncoderFactory messageEncoderFactory; + AmqpTransportBindingElement bindingElement; + AmqpChannelProperties channelProperties; + BindingContext bindingContext; + bool shared; + int prefetchLimit; + long maxBufferPoolSize; + Uri uri; + AmqpTransportChannel amqpTransportChannel; + delegate IInputChannel AsyncOnAcceptCaller (TimeSpan timeout); + AsyncOnAcceptCaller asyncOnAcceptCaller; + ManualResetEvent acceptWaitEvent; + + internal AmqpChannelListener(AmqpTransportBindingElement bindingElement, BindingContext context) + : base(context.Binding) + { + this.bindingElement = bindingElement; + this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.bindingContext = context; + this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; + + this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; + + // TODO: review this. Should be unique hostname based + this.uri = context.ListenUriBaseAddress; + this.asyncOnAcceptCaller = new AsyncOnAcceptCaller(this.OnAcceptChannel); + this.acceptWaitEvent = new ManualResetEvent(false); + + Collection<MessageEncodingBindingElement> messageEncoderBindingElements + = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); + + if(messageEncoderBindingElements.Count > 1) + { + throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); + } + else if (messageEncoderBindingElements.Count == 1) + { + this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); + } + else + { + this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); + } + } + + public override Uri Uri + { + get + { + return this.uri; + } + } + + + + public override T GetProperty<T>() + { + T mep = messageEncoderFactory.Encoder.GetProperty<T>(); + if (mep != null) + { + return mep; + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)messageEncoderFactory.Encoder.MessageVersion; + } + + return base.GetProperty<T>(); + } + + protected override void OnOpen(TimeSpan timeout) + { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginOpen"); + //// return null; + } + + protected override void OnEndOpen(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndOpen"); + } + + protected override bool OnWaitForChannel(TimeSpan timeout) + { + throw new NotImplementedException("AmqpChannelListener OnWaitForChannel"); + } + + protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginWaitForChannel"); + } + + protected override bool OnEndWaitForChannel(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndWaitForChannel"); + } + + protected override IInputChannel OnAcceptChannel(TimeSpan timeout) + { + if (this.IsDisposed) + { + return null; + } + + if (amqpTransportChannel == null) + { + // TODO: add timeout processing + amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, + new EndpointAddress(uri), messageEncoderFactory.Encoder, + maxBufferPoolSize, this.shared, this.prefetchLimit); + return (IInputChannel)(object)amqpTransportChannel; + } + + // Singleton channel. Subsequent Accepts wait until the listener is closed + acceptWaitEvent.WaitOne(); + return null; + } + + protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + return asyncOnAcceptCaller.BeginInvoke(timeout, callback, state); + } + + protected override IInputChannel OnEndAcceptChannel(IAsyncResult result) + { + return asyncOnAcceptCaller.EndInvoke(result); + } + + protected override void OnClose(TimeSpan timeout) + { + if (amqpTransportChannel != null) + { + amqpTransportChannel.Close(); + } + acceptWaitEvent.Set(); + } + + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginClose"); + } + + protected override void OnEndClose(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndClose"); + } + + protected override void OnAbort() + { + if (amqpTransportChannel != null) + amqpTransportChannel.Abort(); + acceptWaitEvent.Set(); + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs new file mode 100644 index 0000000000..e2da07c800 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs @@ -0,0 +1,113 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/*
+ * AMQP has a SASL authentication mechanism that doesn't match
+ * with existing .NET credentials. The analogy breaks down further
+ * if there is a list of brokers to cycle through on failover.
+ * This class will allow arbitrary credentials to be specified
+ * by the user, but is meant to be sensibly populated by bindings
+ * that use it from ClientCredentials.
+ * See the related interplay of ClientCredentials and
+ * WebProxy NetworkCredential for the BasicHttpBinding.
+ */
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+
+ /// <summary>
+ /// Credentials for establishing a connection to an AMQP server.
+ /// </summary>
+ public class AmqpCredential
+ {
+ private string password;
+ private string userName; // SASL authentication id
+ // Future: private string the_Sasl_Authorization_ID
+ // Future: private X509CertificateInitiatorClientCredential tlsClientCertificate;
+
+ public AmqpCredential(string userName, string password)
+ {
+ if (userName == null)
+ {
+ throw new ArgumentNullException("user name");
+ }
+
+ if (password == null)
+ {
+ throw new ArgumentNullException("password");
+ }
+
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public string UserName
+ {
+ get
+ {
+ if (this.userName == null)
+ {
+ this.userName = "";
+ }
+
+ return this.userName;
+ }
+
+ set
+ {
+ if (value == null)
+ {
+ throw new ArgumentNullException("user name");
+ }
+
+ this.userName = value;
+ }
+ }
+
+ public string Password
+ {
+ get
+ {
+ if (this.password == null)
+ {
+ this.password = "";
+ }
+
+ return this.password;
+ }
+
+ set
+ {
+ if (value == null)
+ {
+ throw new ArgumentNullException("password");
+ }
+
+ this.password = value;
+ }
+ }
+
+ public AmqpCredential Clone()
+ {
+ return (AmqpCredential) this.MemberwiseClone();
+ }
+ }
+
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs new file mode 100644 index 0000000000..2bafbbb54e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs @@ -0,0 +1,37 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// Enumerates the SASL authentication mechanisms used by the AMQP transport
+ /// </summary>
+ public enum AmqpCredentialType
+ {
+ /// <summary>
+ /// SASL ANONYMOUS mechanism
+ /// </summary>
+ Anonymous,
+
+ /// <summary>
+ /// SASL PLAIN mechanism: username and password
+ /// </summary>
+ Plain
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs new file mode 100644 index 0000000000..5d88afb88f --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs @@ -0,0 +1,75 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+
+ /// <summary>
+ /// Specifies the types of trasport-level and message-level security used by
+ /// an endpoint configured with an AmqpBinding.
+ /// </summary>
+ public sealed class AmqpSecurity
+ {
+ private AmqpSecurityMode mode;
+ private AmqpTransportSecurity transportSecurity;
+
+ internal AmqpSecurity()
+ {
+ this.mode = AmqpSecurityMode.None;
+ }
+
+ internal AmqpSecurity(AmqpTransportSecurity tsec)
+ {
+ if (tsec == null)
+ {
+ throw new ArgumentNullException("AmqpTransportSecurity");
+ }
+
+ this.mode = AmqpSecurityMode.Transport;
+ this.transportSecurity = tsec;
+ }
+
+ /// <summary>
+ /// gets or sets the security mode.
+ /// </summary>
+ public AmqpSecurityMode Mode
+ {
+ get { return this.mode; }
+ set {this.mode = value; }
+ }
+
+ /// <summary>
+ /// gets the security object that controls encryption
+ /// and authentication parameters for the AMQP transport.
+ /// </summary>
+ public AmqpTransportSecurity Transport
+ {
+ get
+ {
+ if (this.transportSecurity == null)
+ {
+ this.transportSecurity = new AmqpTransportSecurity();
+ }
+
+ return this.transportSecurity;
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs new file mode 100644 index 0000000000..f7370e40f5 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs @@ -0,0 +1,126 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Collections.ObjectModel;
+ using System.Configuration;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Configuration;
+ using Apache.Qpid.AmqpTypes;
+
+ public sealed class AmqpSecurityElement : ConfigurationElement
+ {
+ public AmqpSecurityElement()
+ {
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityMode,
+ typeof(AmqpSecurityMode), AmqpSecurityMode.None, null, null, ConfigurationPropertyOptions.None));
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransport,
+ typeof(AmqpTransportSecurityElement), null));
+
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityMode, DefaultValue = AmqpSecurityMode.None)]
+ public AmqpSecurityMode Mode
+ {
+ get { return (AmqpSecurityMode)base[AmqpConfigurationStrings.SecurityMode]; }
+ set { base[AmqpConfigurationStrings.SecurityMode] = value; }
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransport)]
+ public AmqpTransportSecurityElement Transport
+ {
+ get { return (AmqpTransportSecurityElement)base[AmqpConfigurationStrings.SecurityTransport]; }
+ set { base[AmqpConfigurationStrings.SecurityTransport] = value; }
+ }
+ }
+
+ public class AmqpTransportSecurityElement : ConfigurationElement
+ {
+ public AmqpTransportSecurityElement()
+ {
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportCredentialType,
+ typeof(AmqpCredentialType), AmqpCredentialType.Anonymous, null, null, ConfigurationPropertyOptions.None));
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportUseSSL,
+ typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportDefaultCredential,
+ typeof(AmqpCredentialElement), null));
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials,
+ typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
+
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportCredentialType, DefaultValue = AmqpCredentialType.Anonymous)]
+ public AmqpCredentialType CredentialType
+ {
+ get { return (AmqpCredentialType)base[AmqpConfigurationStrings.SecurityTransportCredentialType]; }
+ set { base[AmqpConfigurationStrings.SecurityTransportCredentialType] = value; }
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportUseSSL, DefaultValue = false)]
+ public bool UseSSL
+ {
+ get { return (bool)base[AmqpConfigurationStrings.SecurityTransportUseSSL]; }
+ set { base[AmqpConfigurationStrings.SecurityTransportUseSSL] = value; }
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportDefaultCredential, DefaultValue = null)]
+ public AmqpCredentialElement DefaultCredential
+ {
+ get { return (AmqpCredentialElement)base[AmqpConfigurationStrings.SecurityTransportDefaultCredential]; }
+ set { base[AmqpConfigurationStrings.SecurityTransportDefaultCredential] = value; }
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials, DefaultValue = false)]
+ public bool IgnoreEndpointCredentials
+ {
+ get { return (bool)base[AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials]; }
+ set { base[AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials] = value; }
+ }
+ }
+
+ public class AmqpCredentialElement : ConfigurationElement
+ {
+ public AmqpCredentialElement()
+ {
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.CredentialUserName,
+ typeof(string), "", null, null, ConfigurationPropertyOptions.None));
+ Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.CredentialPassword,
+ typeof(string), "", null, null, ConfigurationPropertyOptions.None));
+
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.CredentialUserName, DefaultValue = "")]
+ public string UserName
+ {
+ get { return (string)base[AmqpConfigurationStrings.CredentialUserName]; }
+ set { base[AmqpConfigurationStrings.CredentialUserName] = value; }
+ }
+
+ [ConfigurationProperty(AmqpConfigurationStrings.CredentialPassword, DefaultValue = "")]
+ public string Password
+ {
+ get { return (string)base[AmqpConfigurationStrings.CredentialPassword]; }
+ set { base[AmqpConfigurationStrings.CredentialPassword] = value; }
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs new file mode 100644 index 0000000000..88e7add054 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs @@ -0,0 +1,37 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// Specifies whether trasport-level security is used with AMQP connections
+ /// </summary>
+ public enum AmqpSecurityMode
+ {
+ /// <summary>
+ /// Indicates no security is used with the AMQP transport
+ /// </summary>
+ None,
+
+ /// <summary>
+ /// Indicates transport-level security is used with the AMQP transport
+ /// </summary>
+ Transport
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs new file mode 100644 index 0000000000..a98f361d19 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -0,0 +1,186 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using Apache.Qpid.AmqpTypes; + + public class AmqpTransportBindingElement : TransportBindingElement, ITransactedBindingElement + { + AmqpChannelProperties channelProperties; + bool shared; + int prefetchLimit; + AmqpSecurity bindingSecurity; + + public AmqpTransportBindingElement() + { + // start with default properties + channelProperties = new AmqpChannelProperties(); + } + + protected AmqpTransportBindingElement(AmqpTransportBindingElement other) + : base(other) + { + this.channelProperties = other.channelProperties.Clone(); + this.shared = other.shared; + this.prefetchLimit = other.prefetchLimit; + this.bindingSecurity = other.bindingSecurity; + } + + internal AmqpSecurity BindingSecurity + { + get { return this.bindingSecurity; } + set { this.bindingSecurity = value; } + } + + public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + return (IChannelFactory<TChannel>)(object)new AmqpChannelFactory<TChannel>(this, context); + } + + public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + return (IChannelListener<TChannel>)(object)new AmqpChannelListener(this, context); + } + + + + public override bool CanBuildChannelFactory<TChannel>(BindingContext context) + { + return ((typeof(TChannel) == typeof(IOutputChannel)) || + (typeof(TChannel) == typeof(IInputChannel))); + } + + public override bool CanBuildChannelListener<TChannel>(BindingContext context) + { + return ((typeof(TChannel) == typeof(IInputChannel))); + } + + public override BindingElement Clone() + { + return new AmqpTransportBindingElement(this); + } + + internal AmqpChannelProperties ChannelProperties + { + get { return channelProperties; } + } + + public AmqpCredential AmqpCredential + { + get { return this.channelProperties.AmqpCredential; } + set { this.channelProperties.AmqpCredential = value; } + } + + public string BrokerHost + { + get { return this.channelProperties.BrokerHost; } + set { this.channelProperties.BrokerHost = value; } + } + + public int BrokerPort + { + get { return this.channelProperties.BrokerPort; } + set { this.channelProperties.BrokerPort = value; } + } + + public int PrefetchLimit + { + get { return this.prefetchLimit; } + set { this.prefetchLimit = value; } + } + + public bool Shared + { + get { return this.shared; } + set { this.shared = value; } + } + + public bool TransactedReceiveEnabled + { + get { return true; } + } + + public TransferMode TransferMode + { + get { return this.channelProperties.TransferMode; } + set { this.channelProperties.TransferMode = value; } + } + + public AmqpTransportSecurity TransportSecurity + { + get + { + if (this.channelProperties.AmqpTransportSecurity == null) + { + this.channelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + return this.channelProperties.AmqpTransportSecurity; + } + } + + + public AmqpProperties DefaultMessageProperties + { + get { return this.channelProperties.DefaultMessageProperties; } + + set { this.channelProperties.DefaultMessageProperties = value; } + } + + public override T GetProperty<T>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)MessageVersion.Default; + } + + + return context.GetInnerProperty<T>(); + } + + public override string Scheme + { + get + { + return AmqpConstants.Scheme; + } + } + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs new file mode 100644 index 0000000000..6f0ffd9815 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -0,0 +1,642 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +// TODO: flow control +// timeout handling +// transactions +// check if should split into separate input and output classes (little overlap) + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Text; + using System.Threading; + using System.Globalization; + using System.Web; + using System.Xml; + + // the thin interop layer that provides access to the Qpid AMQP client libraries + using Apache.Qpid.Interop; + using Apache.Qpid.AmqpTypes; + + /// <summary> + /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library + /// </summary> + public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel + { + private static readonly EndpointAddress AnonymousAddress = + new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous"); + + private EndpointAddress remoteAddress; + private MessageEncoder encoder; + private AmqpChannelProperties factoryChannelProperties; + private bool shared; + private int prefetchLimit; + private string encoderContentType; + // AMQP subject/routing key + private string subject; + // Qpid addressing value for "qpid.subject" property + private string qpidSubject; + + private BufferManager bufferManager; + private AmqpProperties outputMessageProperties; + + private InputLink inputLink; + private OutputLink outputLink; + + private bool isInputChannel; + private bool streamed; + + private AsyncTimeSpanCaller asyncOpenCaller; + private AsyncTimeSpanCaller asyncCloseCaller; + + internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) + : base(factory) + { + this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>); + + if (remoteAddress == null) + { + throw new ArgumentException("Null Endpoint Address"); + } + + this.factoryChannelProperties = channelProperties; + this.shared = sharedConnection; + this.prefetchLimit = prefetchLimit; + this.remoteAddress = remoteAddress; + + // pull out host, port, queue, and connection arguments + string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject); + + this.encoder = msgEncoder; + string ct = String.Empty; + if (this.encoder != null) + { + ct = this.encoder.ContentType; + if (ct != null) + { + int pos = ct.IndexOf(';'); + if (pos != -1) + { + ct = ct.Substring(0, pos).Trim(); + } + } + else + { + ct = "application/octet-stream"; + } + } + + this.encoderContentType = ct; + + if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed) + { + this.streamed = true; + } + else + { + if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered)) + { + throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\""); + } + + this.streamed = false; + } + + this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue); + + this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen); + this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose); + + if (this.isInputChannel) + { + this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress); + this.inputLink.PrefetchLimit = this.prefetchLimit; + } + else + { + this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress); + this.subject = this.outputLink.DefaultSubject; + this.qpidSubject = this.outputLink.QpidSubject; + } + } + + private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message); + + private delegate void AsyncTimeSpanCaller(TimeSpan timeout); + + EndpointAddress IOutputChannel.RemoteAddress + { + get + { + return this.remoteAddress; + } + } + + // i.e what you would insert into a ReplyTo header to reach + // here. Presumably should be exchange/link and routing info, + // rather than the actual input queue name. + EndpointAddress IInputChannel.LocalAddress + { + get + { + // TODO: something better + return AnonymousAddress; + } + } + + AmqpProperties OutputMessageProperties + { + get + { + if (this.outputMessageProperties == null) + { + this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties; + if (this.outputMessageProperties == null) + { + this.outputMessageProperties = new AmqpProperties(); + } + } + + return this.outputMessageProperties; + } + } + + Uri IOutputChannel.Via + { + get + { + return this.remoteAddress.Uri; + } + } + + public override T GetProperty<T>() + { + if (typeof(T) == typeof(IInputChannel)) + { + if (this.isInputChannel) + { + return (T)(object)this; + } + } + else if (typeof(T) == typeof(IOutputChannel)) + { + if (!this.isInputChannel) + { + return (T)(object)this; + } + } + + return base.GetProperty<T>(); + } + + public void Send(Message message, TimeSpan timeout) + { + this.ThrowIfDisposedOrNotOpen(); + AmqpChannelHelpers.ValidateTimeout(timeout); + + try + { + using (AmqpMessage amqpMessage = this.WcfToQpid(message)) + { + this.outputLink.Send(amqpMessage, timeout); + } + } + finally + { + message.Close(); + } + } + + public void Send(Message message) + { + this.Send(message, this.DefaultSendTimeout); + } + + public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) + { + this.ThrowIfDisposedOrNotOpen(); + AmqpChannelHelpers.ValidateTimeout(timeout); + + try + { + using (AmqpMessage amqpMessage = this.WcfToQpid(message)) + { + return this.outputLink.BeginSend(amqpMessage, timeout, callback, state); + } + } + finally + { + message.Close(); + } + } + + public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) + { + return this.BeginSend(message, this.DefaultSendTimeout, callback, state); + } + + public void EndSend(IAsyncResult result) + { + this.outputLink.EndSend(result); + } + + public Message Receive(TimeSpan timeout) + { + Message message; + if (this.TryReceive(timeout, out message)) + { + return message; + } + else + { + throw new TimeoutException("Receive"); + } + } + + public Message Receive() + { + return this.Receive(this.DefaultReceiveTimeout); + } + + public bool TryReceive(TimeSpan timeout, out Message message) + { + AmqpMessage amqpMessage; + message = null; + + if (this.inputLink.TryReceive(timeout, out amqpMessage)) + { + message = this.QpidToWcf(amqpMessage); + return true; + } + + return false; + } + + public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginTryReceive(timeout, callback, state); + } + + public bool EndTryReceive(IAsyncResult result, out Message message) + { + AmqpMessage amqpMessage = null; + if (!this.inputLink.EndTryReceive(result, out amqpMessage)) + { + message = null; + return false; + } + message = QpidToWcf(amqpMessage); + return true; + } + + public bool WaitForMessage(TimeSpan timeout) + { + return this.inputLink.WaitForMessage(timeout); + } + + public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginTryReceive(timeout, callback, state); + } + + public IAsyncResult BeginReceive(AsyncCallback callback, object state) + { + return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); + } + + public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginWaitForMessage(timeout, callback, state); + } + + public Message EndReceive(IAsyncResult result) + { + Message message; + if (this.EndTryReceive(result, out message)) + { + return message; + } + else + { + throw new TimeoutException("EndReceive"); + } + } + + public bool EndWaitForMessage(IAsyncResult result) + { + return this.inputLink.EndWaitForMessage(result); + } + + public void CloseEndPoint() + { + if (this.inputLink != null) + { + this.inputLink.Close(); + } + if (this.outputLink != null) + { + this.outputLink.Close(); + } + } + + /// <summary> + /// Open connection to Broker + /// </summary> + protected override void OnOpen(TimeSpan timeout) + { + // TODO: move open logic to here from constructor + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.asyncOpenCaller.BeginInvoke(timeout, callback, state); + } + + protected override void OnEndOpen(IAsyncResult result) + { + this.asyncOpenCaller.EndInvoke(result); + } + + protected override void OnAbort() + { + //// TODO: check for network-less qpid teardown or launch special thread + this.CloseEndPoint(); + this.Cleanup(); + } + + /// <summary> + /// Shutdown gracefully + /// </summary> + protected override void OnClose(TimeSpan timeout) + { + this.CloseEndPoint(); + this.Cleanup(); + } + + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.asyncCloseCaller.BeginInvoke(timeout, callback, state); + } + + protected override void OnEndClose(IAsyncResult result) + { + this.asyncCloseCaller.EndInvoke(result); + } + + private AmqpMessage WcfToQpid(Message wcfMessage) + { + object obj; + AmqpProperties applicationProperties = null; + bool success = false; + AmqpMessage amqpMessage = null; + + if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj)) + { + applicationProperties = obj as AmqpProperties; + } + + try + { + AmqpProperties outgoingProperties = new AmqpProperties(); + + // Start with AMQP properties from the binding and the URI + if (this.factoryChannelProperties.DefaultMessageProperties != null) + { + outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); + } + + if (this.subject != null) + { + outgoingProperties.RoutingKey = this.subject; + } + + if (this.qpidSubject != null) + { + outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject); + } + + // Add the Properties set by the application on this particular message. + // Application properties trump channel properties + if (applicationProperties != null) + { + outgoingProperties.MergeFrom(applicationProperties); + } + + amqpMessage = this.outputLink.CreateMessage(); + amqpMessage.Properties = outgoingProperties; + + // copy the WCF message body to the AMQP message body + if (this.streamed) + { + this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream); + } + else + { + ArraySegment<byte> encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager); + try + { + amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count); + } + finally + { + this.bufferManager.ReturnBuffer(encodedBody.Array); + } + } + + success = true; + } + finally + { + if (!success && (amqpMessage != null)) + { + amqpMessage.Dispose(); + } + } + return amqpMessage; + } + + + private Message QpidToWcf(AmqpMessage amqpMessage) + { + if (amqpMessage == null) + { + return null; + } + + Message wcfMessage = null; + byte[] managedBuffer = null; + + try + { + if (this.streamed) + { + wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue); + } + else + { + int count = (int)amqpMessage.BodyStream.Length; + managedBuffer = this.bufferManager.TakeBuffer(count); + int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count); + ArraySegment<byte> bufseg = new ArraySegment<byte>(managedBuffer, 0, count); + + wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager); + + // set to null for finally{} block, since the encoder is now responsible for + // returning the BufferManager memory + managedBuffer = null; + } + + // This message will be discarded unless the "To" header matches + // the WCF endpoint dispatcher's address filter (or the service is + // AddressFilterMode=AddressFilterMode.Any). + + this.remoteAddress.ApplyTo(wcfMessage); + + if (amqpMessage.Properties != null) + { + wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties); + } + } + catch (XmlException xmlException) + { + throw new ProtocolException( + "There is a problem with the XML that was received from the network. See inner exception for more details.", + xmlException); + } + catch (Exception e) + { + // TODO: logging + Console.WriteLine("TX channel encoder exception " + e); + } + finally + { + // close the amqpMessage unless the body will be read at a later time. + if (!this.streamed || wcfMessage == null) + { + amqpMessage.Close(); + } + + // the handoff to the encoder failed + if (managedBuffer != null) + { + this.bufferManager.ReturnBuffer(managedBuffer); + } + } + + return wcfMessage; + } + + private void Cleanup() + { + this.bufferManager.Clear(); + } + + private string UriToQpidAddress(Uri uri, out string subject) + { + if (uri.Scheme != AmqpConstants.Scheme) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); + } + + subject = ""; + string path = uri.LocalPath; + string query = uri.Query; + + // legacy... convert old style myqueue?routingkey=key to myqueue/key + + if (query.Length > 0) + { + if (!query.StartsWith("?")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument."), "uri"); + } + + string routingParseKey = "routingkey="; + string subjectParseKey = "subject="; + char[] charSeparators = new char[] { '?', ';' }; + string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); + foreach (string s in args) + { + if (s.StartsWith(routingParseKey)) + { + subject = s.Substring(routingParseKey.Length); + } + else if (s.StartsWith(subjectParseKey)) + { + subject = s.Substring(subjectParseKey.Length); + } + else + { + if (s.Length > 0) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument {0}.", s), "uri"); + } + } + } + + if (path.Contains("/")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid queue name {0}.", path), "uri"); + } + + if (path.Length == 0) + { + // special case, user wants default exchange + return "//" + subject; + } + + return path + "/" + subject; + } + + // find subject in "myqueue/mysubject;{mode:browse}" + int pos = path.IndexOf('/'); + if ((pos > -1) && (pos < path.Length + 1)) + { + subject = path.Substring(pos); + pos = subject.IndexOf(';'); + if (pos == 0) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Empty subject in address {0}.", path), "uri"); + } + + if (pos > 0) + { + subject = subject.Substring(0, pos); + } + } + + if (subject.Length > 0) + { + subject = HttpUtility.UrlDecode(subject); + } + + return HttpUtility.UrlDecode(path); + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs new file mode 100644 index 0000000000..b722983ead --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs @@ -0,0 +1,101 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// This class is used by the AMQP Transport to set transport-level security settings for a binding
+ /// </summary>
+ public sealed class AmqpTransportSecurity
+ {
+ private AmqpCredentialType credentialType;
+
+ // WCF frowns on unencrypted credentials on the wire, but AMQP is agnostic.
+ // For interoperability, allow SSL to be turned on/off independentaly.
+ private bool useSSL;
+
+ // Allow per channel credentials, but also ease the common case where
+ // credentials are shared and wish to be globally set in a config file.
+ private AmqpCredential defaultCredential;
+
+ // if true, do not look at context for ServiceModel.Description.ClientCredentials.
+ // ClientCredentials will be place of choice for WCF traditionalists
+ // to specify auth tokens to the AMQP server when Windows and SASL tokens
+ // look the same. At other times it makes no sense and sometimes it is
+ // confusing with Message-level credentials.
+ private bool ignoreEndpointClientCredentials;
+
+
+ internal AmqpTransportSecurity()
+ {
+ this.credentialType = AmqpCredentialType.Anonymous;
+ this.useSSL = true;
+ }
+
+ /// <summary>
+ /// gets or sets the SASL mechanism for AMQP authentication between client and server.
+ /// </summary>
+ public AmqpCredentialType CredentialType
+ {
+ get { return this.credentialType; }
+
+ set { this.credentialType = value; }
+ }
+
+ /// <summary>
+ /// gets or sets the flag that controls the use of SSL encryption
+ /// over the network connection.
+ /// </summary>
+ public bool UseSSL
+ {
+ get { return this.useSSL; }
+ set { this.useSSL = value; }
+ }
+
+ /// <summary>
+ /// gets the default credential object for authentication with the AMQP server.
+ /// </summary>
+ public AmqpCredential DefaultCredential
+ {
+ get { return this.defaultCredential; }
+ set { this.defaultCredential = value; }
+ }
+
+ /// <summary>
+ /// gets or sets the endpoint ClientCredentials search parameter. If true,
+ /// only AmqpCredential objects are searched for in the surrounding context.
+ /// </summary>
+ public bool IgnoreEndpointClientCredentials
+ {
+ get { return this.ignoreEndpointClientCredentials; }
+ set { this.ignoreEndpointClientCredentials = value; }
+ }
+
+ internal AmqpTransportSecurity Clone()
+ {
+ AmqpTransportSecurity sec = (AmqpTransportSecurity)this.MemberwiseClone();
+ if (this.defaultCredential != null)
+ {
+ sec.defaultCredential = this.defaultCredential.Clone();
+ }
+
+ return sec;
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj new file mode 100644 index 0000000000..1eb811b425 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -0,0 +1,112 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Channel</RootNamespace>
+ <AssemblyName>Apache.Qpid.Channel</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <StartupObject>
+ </StartupObject>
+ <SignAssembly>true</SignAssembly>
+ <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+ <ItemGroup>
+ <Compile Include="AmqpBinaryBinding.cs" />
+ <Compile Include="AmqpBinaryBindingCollectionElement.cs" />
+ <Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+ <Compile Include="AmqpCredential.cs" />
+ <Compile Include="AmqpCredentialType.cs" />
+ <Compile Include="AmqpSecurity.cs" />
+ <Compile Include="AmqpSecurityElement.cs" />
+ <Compile Include="AmqpSecurityMode.cs" />
+ <Compile Include="AmqpChannelFactory.cs" />
+ <Compile Include="AmqpChannelHelpers.cs" />
+ <Compile Include="AmqpChannelListener.cs" />
+ <Compile Include="AmqpBinding.cs" />
+ <Compile Include="AmqpBindingCollectionElement.cs" />
+ <Compile Include="AmqpBindingConfigurationElement.cs" />
+ <Compile Include="AmqpTransportBindingElement.cs" />
+ <Compile Include="AmqpTransportChannel.cs" />
+ <Compile Include="AmqpTransportSecurity.cs" />
+ <Compile Include="ConnectionManager.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="RawMessage.cs" />
+ <Compile Include="RawMessageEncoder.cs" />
+ <Compile Include="RawMessageEncoderFactory.cs" />
+ <Compile Include="RawMessageEncodingBindingElement.cs" />
+ <Compile Include="RawXmlReader.cs" />
+ <Compile Include="RawXmlWriter.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.configuration" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Transactions" />
+ <Reference Include="System.Web" />
+ <Reference Include="System.XML" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\wcfnet.snk" />
+ </ItemGroup>
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs new file mode 100644 index 0000000000..7238ff2120 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs @@ -0,0 +1,329 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Text; + using System.Threading; + + using Apache.Qpid.Interop; + + // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession + // objects. If two connection requests could be shared (see MakeKey() properties), and + // are designated as shareable, then they will be paired up. Each shared connection is + // a separate instance of a ManagedConnection. All unshared connections use a single + // instance of ManagedConnection with locking turned off. The ManagedConnection object + // registers for notifictation when a connection goes idle (all grandchild InputLink and + // OutputLink objects have been closed), and closes the connection. + + // TODO: the session sharing is roughed-in via comments but needs completing. + + internal sealed class ConnectionManager + { + // A side effect of creating InputLinks and OutputLinks is that counters + // in the respective AmqpSession and AmqpConnection are updated, so care + // must be taken to hold the lock across acquiring a session and opening + // a link on it. + + // one for each shared connection + private static Dictionary<string, ManagedConnection> sharedInstances; + + // this one creates and releases connections that are not shared. No locking required. + private static ManagedConnection unsharedInstance; + + // lock for finding or creating ManagedConnection instances + private static Object connectionLock; + + static ConnectionManager() + { + unsharedInstance = null; + sharedInstances = new Dictionary<string, ManagedConnection>(); + connectionLock = new Object(); + } + + private static string MakeKey(AmqpChannelProperties props) + { + StringBuilder sb = new StringBuilder(); + sb.Append(props.BrokerHost); + sb.Append(':'); + sb.Append(props.BrokerPort); + sb.Append(':'); + sb.Append(props.TransferMode); + + AmqpTransportSecurity sec = props.AmqpTransportSecurity; + if (sec == null) + { + return sb.ToString(); + } + + if (sec.UseSSL) + { + sb.Append(":SSL"); + } + + if (sec.CredentialType == AmqpCredentialType.Plain) + { + sb.Append(":saslP"); + AmqpCredential cred = props.AmqpCredential; + if (cred != null) + { + sb.Append(":NM:"); + sb.Append(cred.UserName); + sb.Append(":PW:"); + sb.Append(cred.Password); + } + } + + return sb.ToString(); + } + + private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing) + { + if (connectionSharing) + { + string key = MakeKey(channelProperties); + lock (connectionLock) + { + ManagedConnection mc = null; + if (!sharedInstances.TryGetValue(key, out mc)) + { + mc = new ManagedConnection(true); + sharedInstances.Add(key, mc); + } + return mc; + } + } + else + { + lock (connectionLock) + { + if (unsharedInstance == null) + { + unsharedInstance = new ManagedConnection(false); + } + return unsharedInstance; + } + } + } + + public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) + { + ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); + return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname); + } + + public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) + { + ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); + return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null); + } + + + + class ManagedConnection + { + private Boolean shared; + private AmqpConnection sharedConnection; + //private Dictionary<string, AmqpSession> sharedSessions; + + public ManagedConnection(bool shared) + { + this.shared = shared; + } + + + public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue) + { + AmqpConnection connection = null; + AmqpSession session = null; + Object link = null; + bool newConnection = false; + //bool newSession = false; + bool success = false; + + // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links + + if (this.shared) + { + Monitor.Enter(this); // lock + } + + try + { + if (this.shared) + { + // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed + if (sessionSharing) + { + throw new NotImplementedException("shared session"); + /* * ... once we have a defined shared session config parameter: + + // lazilly create + if (this.sharedSessions == null) + { + this.sharedSessions = new Dictionary<string, AmqpSession>(); + } + + alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here; + this.sharedSessions.TryGetValue(sessionKey, out session); + + * */ + } + + if (this.sharedConnection != null) + { + connection = this.sharedConnection; + } + } + + if (connection == null) + { + if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None) + { + string user = null; + string passwd = null; + bool ssl = false; + bool saslPlain = false; + + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec.UseSSL) + { + ssl = true; + } + + if (tsec.CredentialType == AmqpCredentialType.Plain) + { + saslPlain = true; + AmqpCredential plainCred = channelProperties.AmqpCredential; + if (plainCred != null) + { + user = plainCred.UserName; + passwd = plainCred.Password; + } + } + + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort, + ssl, saslPlain, user, passwd); + } + else + { + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + } + + newConnection = true; + if (this.shared) + { + connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler); + } + else + { + connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler); + } + } + + if (session == null) + { + session = connection.CreateSession(); + //newSession = true; + } + + if (inputQueue != null) + { + link = session.CreateInputLink(inputQueue); + } + else + { + link = session.CreateOutputLink(outputQueue); + } + + if (this.shared) + { + if (newConnection) + { + this.sharedConnection = connection; + } + /* + if (newSession) + { + sharedSessions.Add(foo, session); + } + * */ + } + + success = true; + } + finally + { + if (this.shared) + { + Monitor.Exit(this); + } + if (!success) + { + /* + if (newSession) + { + session.Close(); + } + */ + if (newConnection) + { + connection.Close(); + } + } + } + + return link; + } + + + static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty) + { + if (sender is AmqpConnection) + { + AmqpConnection connection = (AmqpConnection)sender; + connection.Close(); + } + } + + void IdleConnectionHandler(Object sender, EventArgs empty) + { + lock (this) + { + if (sharedConnection != sender || sharedConnection == null) + { + return; + } + if (!sharedConnection.IsIdle) + { + // Another thread made the connection busy again. + // That's OK. Another idle event will come along later. + return; + } + sharedConnection.Close(); // also closes all child sessions + sharedConnection = null; + //sharedSessions = null; + } + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs b/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..edd9a056a7 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Apache.Qpid.Channel")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to true makes the types in this assembly visible +// to COM components. This is required for this to be used by an +// Excel RTD component. +[assembly: ComVisible(true)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ac02bbb0-2c19-43fb-a36c-b1b0a50eaf1a")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs new file mode 100644 index 0000000000..5925fa47dc --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs @@ -0,0 +1,374 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.ServiceModel.Channels; + using System.Xml; + + // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes). + // If bodyBytes belongs to a BufferManager, we must return it when done. + // The pay-off is OnGetReaderAtBodyContents(). + // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery. + internal class RawMessage : Message + { + private MessageHeaders headers; + private MessageProperties properties; + private XmlDictionaryReaderQuotas readerQuotas; + private Stream bodyStream; + private byte[] bodyBytes; + private int index; + private int count; + private BufferManager bufferManager; + + public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.readerQuotas = quotas; + this.bodyBytes = buffer; + this.index = index; + this.count = count; + this.bufferManager = bufferManager; + } + + public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.bodyStream = stream; + } + + public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports internal needs for CreateBufferedCopy().CreateMessage() + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = quotas; + } + + public override MessageHeaders Headers + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.headers; + } + } + + public override bool IsEmpty + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override bool IsFault + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override MessageProperties Properties + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.properties; + } + } + + public override MessageVersion Version + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return MessageVersion.None; + } + } + + protected override void OnBodyToString(XmlDictionaryWriter writer) + { + if (this.bodyStream != null) + { + writer.WriteString("Stream"); + } + else + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + writer.WriteBase64(this.bodyBytes, this.index, this.count); + writer.WriteEndElement(); + } + } + + protected override void OnClose() + { + Exception deferEx = null; + try + { + base.OnClose(); + } + catch (Exception e) + { + deferEx = e; + } + + try + { + if (this.properties != null) + { + this.properties.Dispose(); + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + try + { + if (this.bufferManager != null) + { + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + if (deferEx != null) + { + throw deferEx; + } + } + + protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize) + { + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + this.bodyStream = null; + this.bodyBytes = buf; + this.count = len; + this.index = 0; + } + else + { + if (this.bufferManager != null) + { + // we could take steps to share the buffer among copies and release the memory + // after the last user finishes by a reference count or such, but we are already + // far from the intended optimized use. Make one GC managed memory copy that is + // shared by all. + byte[] buf = new byte[this.count]; + + Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count); + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + this.bodyBytes = buf; + this.index = 0; + } + } + + return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + + protected override XmlDictionaryReader OnGetReaderAtBodyContents() + { + Stream readerStream = null; + bool ownsStream; + + if (this.bodyStream != null) + { + readerStream = this.bodyStream; + ownsStream = false; + } + else + { + // create stream for duration of XmlReader. + ownsStream = true; + if (this.bufferManager != null) + { + readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager); + this.bufferManager = null; + } + else + { + readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false); + } + } + + return new RawXmlReader(readerStream, this.readerQuotas, ownsStream); + } + + protected override void OnWriteBodyContents(XmlDictionaryWriter writer) + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + writer.WriteBase64(buf, 0, len); + } + else + { + writer.WriteBase64(this.bodyBytes, this.index, this.count); + } + + writer.WriteEndElement(); + } + + private class RawMemoryStream : MemoryStream + { + private BufferManager bufferManager; + private byte[] buffer; + + public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr) + : base(bytes, index, count, false) + { + this.bufferManager = mgr; + this.buffer = bytes; + } + + protected override void Dispose(bool disposing) + { + if (this.bufferManager != null) + { + try + { + this.bufferManager.ReturnBuffer(this.buffer); + } + finally + { + this.bufferManager = null; + base.Dispose(disposing); + } + } + } + } + + private class RawMessageBuffer : MessageBuffer + { + private bool closed; + private MessageHeaders headers; + private MessageProperties properties; + private byte[] bodyBytes; + private int index; + private int count; + private XmlDictionaryReaderQuotas readerQuotas; + + public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + : base() + { + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = new XmlDictionaryReaderQuotas(); + quotas.CopyTo(this.readerQuotas); + } + + public override int BufferSize + { + get { return this.count; } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.headers = null; + if (this.properties != null) + { + this.properties.Dispose(); + this.properties = null; + } + + this.bodyBytes = null; + this.readerQuotas = null; + } + } + + public override Message CreateMessage() + { + if (this.closed) + { + throw new ObjectDisposedException("message"); + } + + return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs new file mode 100644 index 0000000000..76dae6f6c7 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs @@ -0,0 +1,113 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.ServiceModel.Channels; + using System.ServiceModel; + using System.Xml; + + + class RawMessageEncoder : MessageEncoder + { + public const string StreamElementName = "Binary"; + + XmlDictionaryReaderQuotas readerQuotas; + + public RawMessageEncoder(XmlDictionaryReaderQuotas quotas) + { + this.readerQuotas = new XmlDictionaryReaderQuotas(); + if (quotas != null) + { + quotas.CopyTo(this.readerQuotas); + } + } + + public override string ContentType + { + get { return null; } + } + + public override bool IsContentTypeSupported(string contentType) + { + return true; + } + + public override string MediaType + { + get { return null; } + } + + public override MessageVersion MessageVersion + { + get { return MessageVersion.None; } + } + + public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType) + { + RawMessage message = new RawMessage(buffer.Array, buffer.Offset, buffer.Count, bufferManager, readerQuotas); + message.Properties.Encoder = this; + return message; + } + + public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string contentType) + { + RawMessage message = new RawMessage(stream, readerQuotas); + message.Properties.Encoder = this; + return message; + } + + private void CheckType(XmlDictionaryReader reader, XmlNodeType type) + { + if (reader.NodeType != type) + { + throw new System.IO.InvalidDataException(String.Format("RawMessageEncoder xml check {0} type should be {1}", type, reader.NodeType)); + } + } + + public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset) + { + MemoryStream tempStream = new MemoryStream(); + this.WriteMessage(message, tempStream); + int len = messageOffset + (int)tempStream.Length; + byte[] buf = bufferManager.TakeBuffer(len); + MemoryStream targetStream = new MemoryStream(buf); + if (messageOffset > 0) + { + targetStream.Seek(messageOffset, SeekOrigin.Begin); + } + + tempStream.WriteTo(targetStream); + targetStream.Close(); + + return new ArraySegment<byte>(buf, messageOffset, len - messageOffset); + } + + public override void WriteMessage(Message message, Stream stream) + { + using (XmlWriter writer = new RawXmlWriter(stream)) + { + message.WriteMessage(writer); + writer.Flush(); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs new file mode 100644 index 0000000000..5c015f9a1b --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Xml; + using System.ServiceModel.Channels; + + internal class RawMessageEncoderFactory : MessageEncoderFactory + { + RawMessageEncoder encoder; + + public RawMessageEncoderFactory(XmlDictionaryReaderQuotas quotas) + { + this.encoder = new RawMessageEncoder(quotas); + } + + public override MessageEncoder Encoder + { + get { return this.encoder; } + } + + public override MessageVersion MessageVersion + { + get { return encoder.MessageVersion; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs new file mode 100644 index 0000000000..5ec10a976d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs @@ -0,0 +1,102 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel.Channels; + + public class RawMessageEncodingBindingElement : MessageEncodingBindingElement + { + + public RawMessageEncodingBindingElement() + : base() + { + } + + RawMessageEncodingBindingElement(RawMessageEncodingBindingElement originalBindingElement) + { + } + + public override MessageEncoderFactory CreateMessageEncoderFactory() + { + return new RawMessageEncoderFactory(null); + } + + + public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.BuildInnerChannelFactory<TChannel>(); + } + + public override bool CanBuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + return context.CanBuildInnerChannelFactory<TChannel>(); + } + + public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.BuildInnerChannelListener<TChannel>(); + } + + public override bool CanBuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.CanBuildInnerChannelListener<TChannel>(); + } + + + public override BindingElement Clone() + { + return new RawMessageEncodingBindingElement(this); + } + + + + public override MessageVersion MessageVersion + { + get + { + return MessageVersion.None; + } + + set + { + if (value != MessageVersion.None) + throw new ArgumentException("Unsupported message version"); + } + } + + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs new file mode 100644 index 0000000000..8fadfce441 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs @@ -0,0 +1,353 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.Xml; + + internal class RawXmlReader : XmlDictionaryReader + { + ////this class presents a hardcoded XML InfoSet: "<rawtag>X</rawtag>" where X is the entire stream content + + private Stream stream; + private bool closed; + private bool streamOwner; + private ReaderPosition position; + private string contentAsBase64; + private XmlNameTable xmlNameTable; + private XmlDictionaryReaderQuotas readerQuotas; + + public RawXmlReader(Stream stream, XmlDictionaryReaderQuotas quotas, bool streamOwner) + { + this.stream = stream; + this.streamOwner = streamOwner; + if (quotas == null) + { + this.readerQuotas = new XmlDictionaryReaderQuotas(); + } + else + { + this.readerQuotas = quotas; + } + } + + private enum ReaderPosition + { + None, + StartElement, + Content, + EndElement, + EOF + } + + public override int AttributeCount + { + get { return 0; } + } + + public override string BaseURI + { + get { return string.Empty; } + } + + public override int Depth + { + get { return (this.position == ReaderPosition.Content) ? 1 : 0; } + } + + public override bool EOF + { + get { return this.position == ReaderPosition.EOF; } + } + + public override bool HasAttributes + { + get { return false; } + } + + public override bool HasValue + { + get { return this.position == ReaderPosition.Content; } + } + + public override bool IsEmptyElement + { + get { return false; } + } + + public override string LocalName + { + get + { + if (this.position == ReaderPosition.StartElement) + { + return RawMessageEncoder.StreamElementName; + } + + return null; + } + } + + public override string NamespaceURI + { + get { return string.Empty; } + } + + public override XmlNameTable NameTable + { + get + { + if (this.xmlNameTable == null) + { + this.xmlNameTable = new NameTable(); + this.xmlNameTable.Add(RawMessageEncoder.StreamElementName); + } + + return this.xmlNameTable; + } + } + + public override XmlNodeType NodeType + { + get + { + switch (this.position) + { + case ReaderPosition.StartElement: + return XmlNodeType.Element; + case ReaderPosition.Content: + return XmlNodeType.Text; + case ReaderPosition.EndElement: + return XmlNodeType.EndElement; + default: + // and StreamPosition.EOF + return XmlNodeType.None; + } + } + } + + public override string Prefix + { + get { return string.Empty; } + } + + public override ReadState ReadState + { + get + { + switch (this.position) + { + case ReaderPosition.None: + return ReadState.Initial; + case ReaderPosition.StartElement: + case ReaderPosition.Content: + case ReaderPosition.EndElement: + return ReadState.Interactive; + case ReaderPosition.EOF: + return ReadState.Closed; + default: + return ReadState.Error; + } + } + } + + public override string Value + { + get + { + switch (this.position) + { + case ReaderPosition.Content: + if (this.contentAsBase64 == null) + { + this.contentAsBase64 = Convert.ToBase64String(this.ReadContentAsBase64()); + } + + return this.contentAsBase64; + + default: + return string.Empty; + } + } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.position = ReaderPosition.EOF; + this.readerQuotas = null; + if (this.streamOwner) + { + this.stream.Close(); + } + } + } + + public override string GetAttribute(int i) + { + throw new ArgumentOutOfRangeException("i", i, "Argument not in set of valid values"); + } + + public override string GetAttribute(string name, string namespaceURI) + { + return null; + } + + public override string GetAttribute(string name) + { + return null; + } + + public override string LookupNamespace(string prefix) + { + if (prefix == string.Empty) + { + return string.Empty; + } + else if (prefix == "xml") + { + return "http://www.w3.org/XML/1998/namespace"; + } + else if (prefix == "xmlns") + { + return "http://www.w3.org/2000/xmlns/"; + } + else + { + return null; + } + } + + public override bool MoveToAttribute(string name, string ns) + { + return false; + } + + public override bool MoveToAttribute(string name) + { + return false; + } + + public override bool MoveToElement() + { + if (this.position == ReaderPosition.None) + { + this.position = ReaderPosition.StartElement; + return true; + } + + return false; + } + + public override bool MoveToFirstAttribute() + { + return false; + } + + public override bool MoveToNextAttribute() + { + return false; + } + + public override bool Read() + { + switch (this.position) + { + case ReaderPosition.None: + this.position = ReaderPosition.StartElement; + return true; + case ReaderPosition.StartElement: + this.position = ReaderPosition.Content; + return true; + case ReaderPosition.Content: + this.position = ReaderPosition.EndElement; + return true; + case ReaderPosition.EndElement: + this.position = ReaderPosition.EOF; + return false; + case ReaderPosition.EOF: + return false; + default: + return false; + } + } + + public override bool ReadAttributeValue() + { + return false; + } + + public override int ReadContentAsBase64(byte[] buffer, int index, int count) + { + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + if (this.position != ReaderPosition.Content) + { + throw new InvalidOperationException("XML reader not in Element"); + } + + if (count == 0) + { + return 0; + } + + int readCount = this.stream.Read(buffer, index, count); + if (readCount == 0) + { + this.position = ReaderPosition.EndElement; + } + + return readCount; + } + + public override int ReadContentAsBinHex(byte[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void ResolveEntity() + { + throw new NotSupportedException(); + } + + public override bool TryGetBase64ContentLength(out int length) + { + // The whole stream is this one element + if (!this.closed && this.stream.CanSeek) + { + long streamLength = this.stream.Length; + if (streamLength <= int.MaxValue) + { + length = (int)streamLength; + return true; + } + } + + length = -1; + return false; + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs new file mode 100644 index 0000000000..7d05b70807 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs @@ -0,0 +1,221 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.Xml; + + internal sealed class RawXmlWriter : XmlDictionaryWriter + { + + WriteState state; + Stream stream; + bool closed; + bool rawWritingEnabled; + + public RawXmlWriter(Stream stream) + { + if (stream == null) + { + throw new ArgumentNullException("Stream"); + } + + this.stream = stream; + this.state = WriteState.Start; + } + + public override WriteState WriteState + { + get + { + return this.state; + } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.state = WriteState.Closed; + this.rawWritingEnabled = false; + } + } + + public override void Flush() + { + this.ThrowIfClosed(); + this.stream.Flush(); + } + + public override string LookupPrefix(string ns) + { + return null; + } + + public override void WriteBase64(byte[] buffer, int index, int count) + { + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + ThrowIfClosed(); + + if (!this.rawWritingEnabled) + { + throw new InvalidOperationException("XmlWriter not in Element"); + } + + this.stream.Write(buffer, index, count); + this.state = WriteState.Content; + } + + public override void WriteStartElement(string prefix, string localName, string ns) + { + ThrowIfClosed(); + if (this.state != WriteState.Start) + { + throw new InvalidOperationException("Start Element Already Called"); + } + + if (!string.IsNullOrEmpty(prefix) || !string.IsNullOrEmpty(ns) || localName != RawMessageEncoder.StreamElementName) + { + throw new XmlException("Wrong XML Start Element Name"); + } + this.state = WriteState.Element; + this.rawWritingEnabled = true; + } + + public override void WriteEndElement() + { + ThrowIfClosed(); + if (!this.rawWritingEnabled) + { + throw new InvalidOperationException("Unexpected End Element"); + } + this.rawWritingEnabled = false; + } + + public override void WriteFullEndElement() + { + this.WriteEndElement(); + } + + public override void WriteEndDocument() + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + public override void WriteStartDocument() + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + public override void WriteStartDocument(bool standalone) + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + private void ThrowIfClosed() + { + if (this.closed) + { + throw new InvalidOperationException("XML Writer closed"); + } + } + + + public override void WriteString(string text) + { + throw new NotSupportedException(); + } + + public override void WriteCData(string text) + { + throw new NotSupportedException(); + } + + public override void WriteCharEntity(char ch) + { + throw new NotSupportedException(); + } + + public override void WriteChars(char[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void WriteComment(string text) + { + throw new NotSupportedException(); + } + + public override void WriteDocType(string name, string pubid, string sysid, string subset) + { + throw new NotSupportedException(); + } + + public override void WriteEndAttribute() + { + throw new NotSupportedException(); + } + + public override void WriteEntityRef(string name) + { + throw new NotSupportedException(); + } + + + public override void WriteProcessingInstruction(string name, string text) + { + throw new NotSupportedException(); + } + + public override void WriteRaw(string data) + { + throw new NotSupportedException(); + } + + public override void WriteRaw(char[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void WriteStartAttribute(string prefix, string localName, string ns) + { + throw new NotSupportedException(); + } + + public override void WriteSurrogateCharEntity(char lowChar, char highChar) + { + throw new NotSupportedException(); + } + + public override void WriteWhitespace(string ws) + { + throw new NotSupportedException(); + } + } +} |