diff options
author | Stephen D. Huston <shuston@apache.org> | 2010-04-27 21:38:39 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2010-04-27 21:38:39 +0000 |
commit | 61cd187331ac9491240689f6de1b46bfb661a48e (patch) | |
tree | 0c9594ce2e892d8470db79305030499829d7aefa /qpid/wcf/src | |
parent | 8f4749fd3d42d3a4fc78f6b45362d8163fd13fe6 (diff) | |
download | qpid-python-61cd187331ac9491240689f6de1b46bfb661a48e.tar.gz |
Apply all three patches from QPID-2500. Also included WcfPerfTest into the QpidWcf.sln solution.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@938677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/wcf/src')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs | 23 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs | 17 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs | 80 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs | 16 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs | 37 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs | 75 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs | 30 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs | 108 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj | 5 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs | 67 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp | 200 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp | 109 | ||||
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h | 26 |
13 files changed, 734 insertions, 59 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs index b0b71c87f3..cfb2e6095c 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -33,11 +33,11 @@ namespace Apache.Qpid.Channel { protected AmqpTransportBindingElement transport; protected MessageEncodingBindingElement encoding; + protected AmqpSecurity security; public AmqpBinding() + : this (new BinaryMessageEncodingBindingElement()) { - transport = new AmqpTransportBindingElement(); - encoding = new BinaryMessageEncodingBindingElement(); } protected AmqpBinding(MessageEncodingBindingElement encoding) @@ -70,6 +70,25 @@ namespace Apache.Qpid.Channel set { transport.PrefetchLimit = value; } } + public AmqpSecurity Security + { + get + { + if (security == null) + { + if (transport.ChannelProperties.AmqpTransportSecurity == null) + { + transport.ChannelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + security = new AmqpSecurity(transport.ChannelProperties.AmqpTransportSecurity); + transport.BindingSecurity = security; + } + + return security; + } + } + public bool Shared { get { return transport.Shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs index 5012c76d7e..9b27b00994 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel using System; using System.ServiceModel; using System.ServiceModel.Channels; + using System.ServiceModel.Description; using System.Collections.Generic; using System.Collections.ObjectModel; @@ -33,12 +34,14 @@ namespace Apache.Qpid.Channel long maxBufferPoolSize; bool shared; int prefetchLimit; + BindingContext bindingContext; List<AmqpTransportChannel> openChannels; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) { this.bindingElement = bindingElement; + this.bindingContext = context; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; this.prefetchLimit = bindingElement.PrefetchLimit; @@ -81,6 +84,20 @@ namespace Apache.Qpid.Channel protected override void OnOpen(TimeSpan timeout) { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs index 0853b3d6f3..2a5b9410dc 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -24,6 +24,7 @@ namespace Apache.Qpid.Channel using System.Net.Sockets; using System.ServiceModel; using System.ServiceModel.Channels; + using System.ServiceModel.Description; using System.Globalization; using Apache.Qpid.AmqpTypes; @@ -67,7 +68,9 @@ namespace Apache.Qpid.Channel int brokerPort; TransferMode transferMode; AmqpProperties defaultMessageProperties; - + AmqpSecurityMode amqpSecurityMode; + AmqpTransportSecurity amqpTransportSecurity; + AmqpCredential amqpCredential; long maxBufferPoolSize; int maxReceivedMessageSize; @@ -77,6 +80,9 @@ namespace Apache.Qpid.Channel this.brokerPort = AmqpDefaults.BrokerPort; this.transferMode = AmqpDefaults.TransferMode; this.defaultMessageProperties = null; + this.amqpSecurityMode = AmqpSecurityMode.None; + this.amqpTransportSecurity = null; + this.amqpCredential = null; this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize; this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize; } @@ -89,6 +95,16 @@ namespace Apache.Qpid.Channel props.defaultMessageProperties = this.defaultMessageProperties.Clone(); } + if (this.amqpTransportSecurity != null) + { + props.amqpTransportSecurity = this.amqpTransportSecurity.Clone(); + } + + if (this.amqpCredential != null) + { + this.amqpCredential = this.amqpCredential.Clone(); + } + return props; } @@ -116,6 +132,24 @@ namespace Apache.Qpid.Channel set { this.defaultMessageProperties = value; } } + internal AmqpSecurityMode AmqpSecurityMode + { + get { return this.amqpSecurityMode; } + set { this.amqpSecurityMode = value; } + } + + internal AmqpTransportSecurity AmqpTransportSecurity + { + get { return this.amqpTransportSecurity; } + set { this.amqpTransportSecurity = value; } + } + + internal AmqpCredential AmqpCredential + { + get { return this.amqpCredential; } + set { this.amqpCredential = value; } + } + internal long MaxBufferPoolSize { get { return this.maxBufferPoolSize; } @@ -138,5 +172,49 @@ namespace Apache.Qpid.Channel throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue."); } } + + internal static void FindAuthenticationCredentials(AmqpChannelProperties channelProperties, + BindingContext bindingContext) + { + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec == null) + { + // no auth + return; + } + + if (tsec.CredentialType == AmqpCredentialType.Anonymous) + { + // no auth + return; + } + + // credentials search order: specific AmqpCredentials, specific + // ClientCredentials (if applicable), binding's default credentials + + AmqpCredential amqpCred = bindingContext.BindingParameters.Find<AmqpCredential>(); + if (amqpCred != null) + { + channelProperties.AmqpCredential = amqpCred; + return; + } + + if (!tsec.IgnoreEndpointClientCredentials) + { + ClientCredentials cliCred = bindingContext.BindingParameters.Find<ClientCredentials>(); + if (cliCred != null) + { + channelProperties.AmqpCredential = new AmqpCredential(cliCred.UserName.UserName, + cliCred.UserName.Password); + return; + } + } + + if (tsec.DefaultCredential != null) + { + channelProperties.AmqpCredential = tsec.DefaultCredential.Clone(); + } + } + } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs index 3d7801e7c6..78655f2124 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -31,6 +31,7 @@ namespace Apache.Qpid.Channel MessageEncoderFactory messageEncoderFactory; AmqpTransportBindingElement bindingElement; AmqpChannelProperties channelProperties; + BindingContext bindingContext; bool shared; int prefetchLimit; long maxBufferPoolSize; @@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel { this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.bindingContext = context; this.shared = bindingElement.Shared; this.prefetchLimit = bindingElement.PrefetchLimit; @@ -100,6 +102,20 @@ namespace Apache.Qpid.Channel protected override void OnOpen(TimeSpan timeout) { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs new file mode 100644 index 0000000000..2bafbbb54e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs @@ -0,0 +1,37 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// Enumerates the SASL authentication mechanisms used by the AMQP transport
+ /// </summary>
+ public enum AmqpCredentialType
+ {
+ /// <summary>
+ /// SASL ANONYMOUS mechanism
+ /// </summary>
+ Anonymous,
+
+ /// <summary>
+ /// SASL PLAIN mechanism: username and password
+ /// </summary>
+ Plain
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs new file mode 100644 index 0000000000..5d88afb88f --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs @@ -0,0 +1,75 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+
+ /// <summary>
+ /// Specifies the types of trasport-level and message-level security used by
+ /// an endpoint configured with an AmqpBinding.
+ /// </summary>
+ public sealed class AmqpSecurity
+ {
+ private AmqpSecurityMode mode;
+ private AmqpTransportSecurity transportSecurity;
+
+ internal AmqpSecurity()
+ {
+ this.mode = AmqpSecurityMode.None;
+ }
+
+ internal AmqpSecurity(AmqpTransportSecurity tsec)
+ {
+ if (tsec == null)
+ {
+ throw new ArgumentNullException("AmqpTransportSecurity");
+ }
+
+ this.mode = AmqpSecurityMode.Transport;
+ this.transportSecurity = tsec;
+ }
+
+ /// <summary>
+ /// gets or sets the security mode.
+ /// </summary>
+ public AmqpSecurityMode Mode
+ {
+ get { return this.mode; }
+ set {this.mode = value; }
+ }
+
+ /// <summary>
+ /// gets the security object that controls encryption
+ /// and authentication parameters for the AMQP transport.
+ /// </summary>
+ public AmqpTransportSecurity Transport
+ {
+ get
+ {
+ if (this.transportSecurity == null)
+ {
+ this.transportSecurity = new AmqpTransportSecurity();
+ }
+
+ return this.transportSecurity;
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs index 7993252309..a98f361d19 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -29,7 +29,8 @@ namespace Apache.Qpid.Channel { AmqpChannelProperties channelProperties; bool shared; - int prefetchLimit; + int prefetchLimit; + AmqpSecurity bindingSecurity; public AmqpTransportBindingElement() { @@ -43,6 +44,13 @@ namespace Apache.Qpid.Channel this.channelProperties = other.channelProperties.Clone(); this.shared = other.shared; this.prefetchLimit = other.prefetchLimit; + this.bindingSecurity = other.bindingSecurity; + } + + internal AmqpSecurity BindingSecurity + { + get { return this.bindingSecurity; } + set { this.bindingSecurity = value; } } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) @@ -88,6 +96,12 @@ namespace Apache.Qpid.Channel get { return channelProperties; } } + public AmqpCredential AmqpCredential + { + get { return this.channelProperties.AmqpCredential; } + set { this.channelProperties.AmqpCredential = value; } + } + public string BrokerHost { get { return this.channelProperties.BrokerHost; } @@ -123,6 +137,20 @@ namespace Apache.Qpid.Channel set { this.channelProperties.TransferMode = value; } } + public AmqpTransportSecurity TransportSecurity + { + get + { + if (this.channelProperties.AmqpTransportSecurity == null) + { + this.channelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + return this.channelProperties.AmqpTransportSecurity; + } + } + + public AmqpProperties DefaultMessageProperties { get { return this.channelProperties.DefaultMessageProperties; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs new file mode 100644 index 0000000000..41c36c7bcd --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs @@ -0,0 +1,108 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// This class is used by the AMQP Transport to set transport-level security settings for a binding
+ /// </summary>
+ public sealed class AmqpTransportSecurity
+ {
+ private AmqpCredentialType credentialType;
+
+ // WCF frowns on unencrypted credentials on the wire, but AMQP is agnostic.
+ // For interoperability, allow SSL to be turned on/off independentaly.
+ private bool useSSL;
+
+ // Allow per channel credentials, but also ease the common case where
+ // credentials are shared and wish to be globally set in a config file.
+ private AmqpCredential defaultCredential;
+
+ // if true, do not look at context for ServiceModel.Description.ClientCredentials.
+ // ClientCredentials will be place of choice for WCF traditionalists
+ // to specify auth tokens to the AMQP server when Windows and SASL tokens
+ // look the same. At other times it makes no sense and sometimes it is
+ // confusing with Message-level credentials.
+ private bool ignoreEndpointClientCredentials;
+
+
+ internal AmqpTransportSecurity()
+ {
+ this.credentialType = AmqpCredentialType.Anonymous;
+ this.useSSL = true;
+ }
+
+ /// <summary>
+ /// gets or sets the SASL mechanism for AMQP authentication between client and server.
+ /// </summary>
+ public AmqpCredentialType CredentialType
+ {
+ get { return this.credentialType; }
+
+ set { this.credentialType = value; }
+ }
+
+ /// <summary>
+ /// gets or sets the flag that controls the use of SSL encryption
+ /// over the network connection.
+ /// </summary>
+ public bool UseSSL
+ {
+ get { return this.useSSL; }
+ set { this.useSSL = value; }
+ }
+
+ /// <summary>
+ /// gets the default credential object for authentication with the AMQP server.
+ /// </summary>
+ public AmqpCredential DefaultCredential
+ {
+ get
+ {
+ if (this.defaultCredential == null)
+ {
+ this.defaultCredential = new AmqpCredential("", "");
+ }
+
+ return this.defaultCredential;
+ }
+ }
+
+ /// <summary>
+ /// gets or sets the endpoint ClientCredentials search parameter. If true,
+ /// only AmqpCredential objects are searched for in the surrounding context.
+ /// </summary>
+ public bool IgnoreEndpointClientCredentials
+ {
+ get { return this.ignoreEndpointClientCredentials; }
+ set { this.ignoreEndpointClientCredentials = value; }
+ }
+
+ internal AmqpTransportSecurity Clone()
+ {
+ AmqpTransportSecurity sec = (AmqpTransportSecurity)this.MemberwiseClone();
+ if (this.defaultCredential != null)
+ {
+ sec.defaultCredential = this.defaultCredential.Clone();
+ }
+
+ return sec;
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj index ac90fb7d64..dfa41c9417 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -64,6 +64,10 @@ under the License. <Compile Include="AmqpBinaryBinding.cs" />
<Compile Include="AmqpBinaryBindingCollectionElement.cs" />
<Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+ <Compile Include="AmqpCredential.cs" />
+ <Compile Include="AmqpCredentialType.cs" />
+ <Compile Include="AmqpSecurity.cs" />
+ <Compile Include="AmqpSecurityMode.cs" />
<Compile Include="AmqpChannelFactory.cs" />
<Compile Include="AmqpChannelHelpers.cs" />
<Compile Include="AmqpChannelListener.cs" />
@@ -72,6 +76,7 @@ under the License. <Compile Include="AmqpBindingConfigurationElement.cs" />
<Compile Include="AmqpTransportBindingElement.cs" />
<Compile Include="AmqpTransportChannel.cs" />
+ <Compile Include="AmqpTransportSecurity.cs" />
<Compile Include="ConnectionManager.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RawMessage.cs" />
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs index a63e5333f4..7238ff2120 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs @@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel using System; using System.Collections; using System.Collections.Generic; + using System.Text; using System.Threading; using Apache.Qpid.Interop; @@ -61,7 +62,38 @@ namespace Apache.Qpid.Channel private static string MakeKey(AmqpChannelProperties props) { - return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode; + StringBuilder sb = new StringBuilder(); + sb.Append(props.BrokerHost); + sb.Append(':'); + sb.Append(props.BrokerPort); + sb.Append(':'); + sb.Append(props.TransferMode); + + AmqpTransportSecurity sec = props.AmqpTransportSecurity; + if (sec == null) + { + return sb.ToString(); + } + + if (sec.UseSSL) + { + sb.Append(":SSL"); + } + + if (sec.CredentialType == AmqpCredentialType.Plain) + { + sb.Append(":saslP"); + AmqpCredential cred = props.AmqpCredential; + if (cred != null) + { + sb.Append(":NM:"); + sb.Append(cred.UserName); + sb.Append(":PW:"); + sb.Append(cred.Password); + } + } + + return sb.ToString(); } private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing) @@ -165,7 +197,38 @@ namespace Apache.Qpid.Channel if (connection == null) { - connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None) + { + string user = null; + string passwd = null; + bool ssl = false; + bool saslPlain = false; + + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec.UseSSL) + { + ssl = true; + } + + if (tsec.CredentialType == AmqpCredentialType.Plain) + { + saslPlain = true; + AmqpCredential plainCred = channelProperties.AmqpCredential; + if (plainCred != null) + { + user = plainCred.UserName; + passwd = plainCred.Password; + } + } + + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort, + ssl, saslPlain, user, passwd); + } + else + { + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + } + newConnection = true; if (this.shared) { diff --git a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp index f9d8bd8521..33d125e3c6 100644 --- a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp +++ b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp @@ -31,7 +31,7 @@ // registers the Qpid resource manager with DTC, the plugin is loaded and a successful
// connection via xa_open is confirmed before completing registration and saving the DSN
// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to
-// restablish a new connection with the broker and perform recovery.
+// re-establish a new connection with the broker and perform recovery.
//
// Because this plugin is not involved in coordinating any active transactions it only needs to
// partially implement the XA interface.
@@ -71,12 +71,19 @@ private: bool active;
std::string host;
int port;
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
int rmid;
std::vector<qpid::framing::Xid> inDoubtXids;
// current scan position, or -1 if no scan
int cursor;
public:
- ResourceManager(int id, std::string h, int p) : rmid(id), host(h), port(p), active(false), cursor(-1) {}
+ ResourceManager(int id, std::string h, int p, bool sslP, bool saslPlainP, std::string uname, std::string pass)
+ : rmid(id), host(h), port(p), ssl(sslP), saslPlain(saslPlainP), username(uname), password(pass),
+ active(false), cursor(-1) {}
~ResourceManager() {}
INT open();
INT close();
@@ -94,6 +101,7 @@ bool memLocked = false; #define QPIDHMCHARS 512
+
void pinDll() {
if (!memLocked) {
char thisDllName[QPIDHMCHARS];
@@ -141,16 +149,53 @@ void QpidToXa(Xid &qpidXid, XID &winXid) { }
-/* parse string from AmqpConnection.h
+static char *dsnHeader = "QPIDdsnV2";
- this info will eventually include authentication tokens
+const char* nextDot(const char *p) {
+ while (*p && (*p != '.'))
+ p++;
+ return p;
+}
- dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host,
- System::Diagnostics::Process::GetCurrentProcess()->Id,
- AppDomain::CurrentDomain->Id);
-*/
+int getHexChar (char c) {
+ if ((c >= '0') && (c <= '9'))
+ return c - '0';
+
+ if ((c >= 'a') && (c <= 'f'))
+ return 10 + (c - 'a');
+
+ if ((c >= 'A') && (c <= 'F'))
+ return 10 + (c - 'A');
+
+ return -1;
+}
+
+bool parseFromHex(const char* start, const char* end, std::string& target)
+{
+ const char *p = start;
+
+ while ((p + 1) < end) {
+ int nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ int byte = (nibble << 4);
+ nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ byte += nibble;
+ target.append (1, (char) byte & 0xFF);
+ }
+ return (p == end);
+}
-bool parseDsn (const char *dsn, std::string& host, int& port) {
+
+// parse string from AmqpConnection::DataSourcename
+// "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+//
+// parse strictly and return false if the dsn is in a bad format
+
+bool parseDsn (const char *dsn, std::string& host, int& port, bool& ssl, bool& saslPlain,
+ std::string& username, std::string& password) {
if (dsn == NULL)
return false;
@@ -158,45 +203,125 @@ bool parseDsn (const char *dsn, std::string& host, int& port) { if (len > 1024)
return false;
- int firstDot = 0;
- for (int i = 0; i < len; i++)
- if (dsn[i] == '.') {
- firstDot = i;
- break;
- }
- if (!firstDot)
+ if (strncmp(dsn, dsnHeader, strlen(dsnHeader)))
return false;
- // look for 2 dots side by side to indicate end of the host
- int doubleDot = 0;
- for (int i = firstDot + 1; i < (len - 1); i++)
- if ((dsn[i] == '.') && (dsn[i+1] == '.')) {
- doubleDot = i;
- break;
- }
- if (!doubleDot)
+ const char *endp = dsn + len;
+ const char *tokenp = dsn + strlen(dsnHeader);
+ if (*tokenp != '.')
+ return false;
+
+ // port
+ tokenp++;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null port not allowed
+
+ const char *token_end = nextDot(tokenp);
+ if ((token_end - tokenp) > 5)
return false;
port = 0;
- for (int i = 0; i < firstDot; i++) {
- char c = dsn[i];
- if ((c < '0') || (c > '9'))
+ for (const char *p = tokenp; p < token_end; p++) {
+ if ((*p < '0') || (*p > '9'))
+ return false;
+ port = (10 * port) + (*p - '0');
+ }
+
+ if (port > 65535)
+ return false;
+
+ // host
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null host not allowed
+
+ token_end = nextDot(tokenp);
+ if (!parseFromHex(tokenp, token_end, host))
+ return false;
+
+ // skip the RM identifier, but verify it exists
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if ((token_end - tokenp) < 3)
+ return false;
+
+ // ssl: look for T or F
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == 'T')
+ ssl = true;
+ else if (*tokenp == 'F')
+ ssl = false;
+ else
+ return false;
+ if (*++tokenp != '.')
+ return false;
+
+ // sasl mechanism: A = anonymous, P = plain. More to come...
+ ++tokenp;
+ if (tokenp >= endp)
+ return false;
+ if (*(tokenp+1) != '.')
+ return false;
+
+ if (*tokenp == 'A') {
+ saslPlain = false;
+ tokenp += 2;
+ // no auth tokens
+ }
+ else if (*tokenp == 'P') {
+ saslPlain = true;
+ tokenp += 2;
+ if (tokenp >= endp)
return false;
- port = (10 * port) + (c - '0');
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, username))
+ return false;
+ tokenp = token_end + 1;
+
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, password))
+ return false;
+ tokenp = token_end + 1;
}
+ else
+ return false;
- host.assign(dsn + firstDot + 1, (doubleDot - firstDot) - 1);
- return true;
+ return (tokenp == endp);
}
+
INT ResourceManager::open() {
INT rv = XAER_RMERR; // placeholder until we successfully connect to resource
active = true;
LeaveCriticalSection(&rmLock);
try {
- qpidConnection.open(host, port);
+ ConnectionSettings settings;
+ settings.host = this->host;
+ settings.port = this->port;
+
+
+ if (ssl)
+ settings.protocol = "ssl";
+
+ if (saslPlain) {
+ settings.username = this->username;
+ settings.password = this->password;
+ settings.mechanism = "PLAIN";
+ }
+
+ qpidConnection.open(settings);
qpidSession = qpidConnection.newSession();
rv = XA_OK;
/*
@@ -359,7 +484,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) { if (nXids > 0) {
StructHelper decoder;
Xid qpidXid;
- for (int i = 0; i < nXids; i++) {
+ for (size_t i = 0; i < nXids; i++) {
decoder.decode (qpidXid, wireFormatXids[i]);
inDoubtXids.push_back(qpidXid);
}
@@ -369,7 +494,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) { // make sure none are too big, just in case
- for (int i = 0; i < nXids; i++) {
+ for (size_t i = 0; i < nXids; i++) {
Xid& xid = inDoubtXids[i];
size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0;
size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0;
@@ -449,10 +574,15 @@ INT __cdecl xa_open (char *xa_info, int rmid, long flags) { else {
std::string brokerHost;
int brokerPort;
- if (parseDsn(xa_info, brokerHost, brokerPort)) {
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
+ if (parseDsn(xa_info, brokerHost, brokerPort, ssl, saslPlain, username, password)) {
try {
- rmp = new ResourceManager(rmid, brokerHost, brokerPort);
+ rmp = new ResourceManager(rmid, brokerHost, brokerPort, ssl, saslPlain, username, password);
rv = rmp->open();
if (rv != XA_OK) {
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp index c3afdf2280..1bc9a15d92 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp @@ -47,10 +47,9 @@ using namespace qpid::client; using namespace std; -// Note on locks: Use "this" for fast counting and idle/busy +// Note on locks: Use thisLock for fast counting and idle/busy // notifications. Use the "sessions" list to serialize session // creation/reaping and overall tear down. -// TODO: switch "this" lock to separate non-visible Object. AmqpConnection::AmqpConnection(String^ server, int port) : @@ -58,19 +57,65 @@ AmqpConnection::AmqpConnection(String^ server, int port) : busyCount(0), disposed(false) { + initialize (server, port, false, false, nullptr, nullptr); +} + +AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) : + connectionp(NULL), + busyCount(0), + disposed(false) +{ + initialize (server, port, ssl, saslPlain, username, password); +} + +void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) +{ + if (server == nullptr) + throw gcnew ArgumentNullException("AMQP server"); + if (saslPlain) { + if (username == nullptr) + throw gcnew ArgumentNullException("username"); + if (username == nullptr) + throw gcnew ArgumentNullException("password"); + } + bool success = false; System::Exception^ openException = nullptr; sessions = gcnew Collections::Generic::List<AmqpSession^>(); + thisLock = gcnew Object(); try { connectionp = new Connection; - connectionp->open (QpidMarshal::ToNative(server), port); + + if (ssl || saslPlain) { + ConnectionSettings proposedSettings; + proposedSettings.host = QpidMarshal::ToNative(server); + proposedSettings.port = port; + if (ssl) + proposedSettings.protocol = "ssl"; + + if (saslPlain) { + proposedSettings.username = QpidMarshal::ToNative(username); + proposedSettings.password = QpidMarshal::ToNative(password); + proposedSettings.mechanism = "PLAIN"; + } + + connectionp->open (proposedSettings); + } + else { + connectionp->open (QpidMarshal::ToNative(server), port); + } + // TODO: registerFailureCallback for failover success = true; const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); this->maxFrameSize = settings.maxFrameSize; this->host = server; this->port = port; + this->ssl = ssl; + this->saslPlain = saslPlain; + this->username = username; + this->password = password; this->isOpen = true; } catch (const qpid::Exception& error) { String^ errmsg = gcnew String(error.what()); @@ -89,7 +134,7 @@ AmqpConnection::AmqpConnection(String^ server, int port) : AmqpConnection^ AmqpConnection::Clone() { if (disposed) throw gcnew ObjectDisposedException("AmqpConnection.Clone"); - return gcnew AmqpConnection (this->host, this->port); + return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password); } void AmqpConnection::Cleanup() @@ -153,7 +198,7 @@ void AmqpConnection::NotifyBusy() { bool changed = false; { - lock l(this); + lock l(thisLock); if (busyCount++ == 0) changed = true; } @@ -166,7 +211,7 @@ void AmqpConnection::NotifyIdle() { bool connectionIdle = false; { - lock l(this); + lock l(thisLock); if (--busyCount == 0) connectionIdle = true; } @@ -175,5 +220,57 @@ void AmqpConnection::NotifyIdle() } } +void HexAppend(StringBuilder^ sb, String^ s) { + if (s->Length > 0) { + array<unsigned char>^ bytes = Encoding::UTF8->GetBytes(s); + for each (unsigned char b in bytes) { + sb->Append(String::Format("{0:x2}", b)); + } + } + sb->Append("."); +} + + +// Note: any change to this format has to be reflected in the DTC plugin's xa_open() +// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password" +// This extended info is needed so that the DTC can make a separate connection to the broker +// for recovery. + +String^ AmqpConnection::DataSourceName::get() { + if (dataSourceName == nullptr) { + StringBuilder^ sb = gcnew StringBuilder(); + sb->Append("QPIDdsnV2."); + + sb->Append(this->port); + sb->Append("."); + + HexAppend(sb, this->host); + + sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id); + sb->Append("-"); + sb->Append(AppDomain::CurrentDomain->Id); + sb->Append("."); + + if (this->ssl) + sb->Append("T"); + else + sb->Append("F"); + sb->Append("."); + + if (this->saslPlain) { + sb->Append("P."); + HexAppend(sb, this->username); + HexAppend(sb, this->password); + } + else { + // SASL anonymous + sb->Append("A."); + } + + dataSourceName = sb->ToString(); + } + return dataSourceName; +} + }}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h index 6533185fa1..ef4d0e3f37 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h @@ -36,17 +36,26 @@ public ref class AmqpConnection { private: Connection* connectionp; - String^ host; - int port; bool disposed; Collections::Generic::List<AmqpSession^>^ sessions; bool isOpen; int busyCount; int maxFrameSize; DtxResourceManager^ dtxResourceManager; - void Cleanup(); // unique string used for distributed transactions String^ dataSourceName; + Object ^thisLock; + + // properties needed to allow DTC to do transactions (see DataSourceName + String^ host; + int port; + bool ssl; + bool saslPlain; + String^ username; + String^ password; + + void Cleanup(); + void initialize (System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); internal: void NotifyBusy(); @@ -63,19 +72,12 @@ private: } property String^ DataSourceName { - // Note: any change to this format has to be reflected in the DTC plugin's xa_open() - String^ get() { - if (dataSourceName == nullptr) { - dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host, - System::Diagnostics::Process::GetCurrentProcess()->Id, - AppDomain::CurrentDomain->Id); - } - return dataSourceName; - } + String^ get(); } public: AmqpConnection(System::String^ server, int port); + AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); ~AmqpConnection(); !AmqpConnection(); void Close(); |