summaryrefslogtreecommitdiff
path: root/qpid/wcf/src
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-04-27 21:38:39 +0000
committerStephen D. Huston <shuston@apache.org>2010-04-27 21:38:39 +0000
commit61cd187331ac9491240689f6de1b46bfb661a48e (patch)
tree0c9594ce2e892d8470db79305030499829d7aefa /qpid/wcf/src
parent8f4749fd3d42d3a4fc78f6b45362d8163fd13fe6 (diff)
downloadqpid-python-61cd187331ac9491240689f6de1b46bfb661a48e.tar.gz
Apply all three patches from QPID-2500. Also included WcfPerfTest into the QpidWcf.sln solution.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@938677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/wcf/src')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs23
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs17
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs80
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs16
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs37
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs75
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs30
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs108
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj5
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs67
-rw-r--r--qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp200
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp109
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h26
13 files changed, 734 insertions, 59 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
index b0b71c87f3..cfb2e6095c 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
@@ -33,11 +33,11 @@ namespace Apache.Qpid.Channel
{
protected AmqpTransportBindingElement transport;
protected MessageEncodingBindingElement encoding;
+ protected AmqpSecurity security;
public AmqpBinding()
+ : this (new BinaryMessageEncodingBindingElement())
{
- transport = new AmqpTransportBindingElement();
- encoding = new BinaryMessageEncodingBindingElement();
}
protected AmqpBinding(MessageEncodingBindingElement encoding)
@@ -70,6 +70,25 @@ namespace Apache.Qpid.Channel
set { transport.PrefetchLimit = value; }
}
+ public AmqpSecurity Security
+ {
+ get
+ {
+ if (security == null)
+ {
+ if (transport.ChannelProperties.AmqpTransportSecurity == null)
+ {
+ transport.ChannelProperties.AmqpTransportSecurity = new AmqpTransportSecurity();
+ }
+
+ security = new AmqpSecurity(transport.ChannelProperties.AmqpTransportSecurity);
+ transport.BindingSecurity = security;
+ }
+
+ return security;
+ }
+ }
+
public bool Shared
{
get { return transport.Shared; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
index 5012c76d7e..9b27b00994 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
@@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
using System.Collections.Generic;
using System.Collections.ObjectModel;
@@ -33,12 +34,14 @@ namespace Apache.Qpid.Channel
long maxBufferPoolSize;
bool shared;
int prefetchLimit;
+ BindingContext bindingContext;
List<AmqpTransportChannel> openChannels;
internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
{
this.bindingElement = bindingElement;
+ this.bindingContext = context;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
this.prefetchLimit = bindingElement.PrefetchLimit;
@@ -81,6 +84,20 @@ namespace Apache.Qpid.Channel
protected override void OnOpen(TimeSpan timeout)
{
+ // check and freeze security properties now
+ AmqpSecurityMode mode = AmqpSecurityMode.None;
+ if (this.bindingElement.BindingSecurity != null)
+ {
+ mode = bindingElement.BindingSecurity.Mode;
+ }
+
+ this.channelProperties.AmqpSecurityMode = mode;
+ if (mode == AmqpSecurityMode.None)
+ {
+ return;
+ }
+
+ AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext);
}
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
index 0853b3d6f3..2a5b9410dc 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
@@ -24,6 +24,7 @@ namespace Apache.Qpid.Channel
using System.Net.Sockets;
using System.ServiceModel;
using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
using System.Globalization;
using Apache.Qpid.AmqpTypes;
@@ -67,7 +68,9 @@ namespace Apache.Qpid.Channel
int brokerPort;
TransferMode transferMode;
AmqpProperties defaultMessageProperties;
-
+ AmqpSecurityMode amqpSecurityMode;
+ AmqpTransportSecurity amqpTransportSecurity;
+ AmqpCredential amqpCredential;
long maxBufferPoolSize;
int maxReceivedMessageSize;
@@ -77,6 +80,9 @@ namespace Apache.Qpid.Channel
this.brokerPort = AmqpDefaults.BrokerPort;
this.transferMode = AmqpDefaults.TransferMode;
this.defaultMessageProperties = null;
+ this.amqpSecurityMode = AmqpSecurityMode.None;
+ this.amqpTransportSecurity = null;
+ this.amqpCredential = null;
this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize;
this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize;
}
@@ -89,6 +95,16 @@ namespace Apache.Qpid.Channel
props.defaultMessageProperties = this.defaultMessageProperties.Clone();
}
+ if (this.amqpTransportSecurity != null)
+ {
+ props.amqpTransportSecurity = this.amqpTransportSecurity.Clone();
+ }
+
+ if (this.amqpCredential != null)
+ {
+ this.amqpCredential = this.amqpCredential.Clone();
+ }
+
return props;
}
@@ -116,6 +132,24 @@ namespace Apache.Qpid.Channel
set { this.defaultMessageProperties = value; }
}
+ internal AmqpSecurityMode AmqpSecurityMode
+ {
+ get { return this.amqpSecurityMode; }
+ set { this.amqpSecurityMode = value; }
+ }
+
+ internal AmqpTransportSecurity AmqpTransportSecurity
+ {
+ get { return this.amqpTransportSecurity; }
+ set { this.amqpTransportSecurity = value; }
+ }
+
+ internal AmqpCredential AmqpCredential
+ {
+ get { return this.amqpCredential; }
+ set { this.amqpCredential = value; }
+ }
+
internal long MaxBufferPoolSize
{
get { return this.maxBufferPoolSize; }
@@ -138,5 +172,49 @@ namespace Apache.Qpid.Channel
throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
}
}
+
+ internal static void FindAuthenticationCredentials(AmqpChannelProperties channelProperties,
+ BindingContext bindingContext)
+ {
+ AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity;
+ if (tsec == null)
+ {
+ // no auth
+ return;
+ }
+
+ if (tsec.CredentialType == AmqpCredentialType.Anonymous)
+ {
+ // no auth
+ return;
+ }
+
+ // credentials search order: specific AmqpCredentials, specific
+ // ClientCredentials (if applicable), binding's default credentials
+
+ AmqpCredential amqpCred = bindingContext.BindingParameters.Find<AmqpCredential>();
+ if (amqpCred != null)
+ {
+ channelProperties.AmqpCredential = amqpCred;
+ return;
+ }
+
+ if (!tsec.IgnoreEndpointClientCredentials)
+ {
+ ClientCredentials cliCred = bindingContext.BindingParameters.Find<ClientCredentials>();
+ if (cliCred != null)
+ {
+ channelProperties.AmqpCredential = new AmqpCredential(cliCred.UserName.UserName,
+ cliCred.UserName.Password);
+ return;
+ }
+ }
+
+ if (tsec.DefaultCredential != null)
+ {
+ channelProperties.AmqpCredential = tsec.DefaultCredential.Clone();
+ }
+ }
+
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
index 3d7801e7c6..78655f2124 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
@@ -31,6 +31,7 @@ namespace Apache.Qpid.Channel
MessageEncoderFactory messageEncoderFactory;
AmqpTransportBindingElement bindingElement;
AmqpChannelProperties channelProperties;
+ BindingContext bindingContext;
bool shared;
int prefetchLimit;
long maxBufferPoolSize;
@@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel
{
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
+ this.bindingContext = context;
this.shared = bindingElement.Shared;
this.prefetchLimit = bindingElement.PrefetchLimit;
@@ -100,6 +102,20 @@ namespace Apache.Qpid.Channel
protected override void OnOpen(TimeSpan timeout)
{
+ // check and freeze security properties now
+ AmqpSecurityMode mode = AmqpSecurityMode.None;
+ if (this.bindingElement.BindingSecurity != null)
+ {
+ mode = bindingElement.BindingSecurity.Mode;
+ }
+
+ this.channelProperties.AmqpSecurityMode = mode;
+ if (mode == AmqpSecurityMode.None)
+ {
+ return;
+ }
+
+ AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext);
}
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs
new file mode 100644
index 0000000000..2bafbbb54e
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// Enumerates the SASL authentication mechanisms used by the AMQP transport
+ /// </summary>
+ public enum AmqpCredentialType
+ {
+ /// <summary>
+ /// SASL ANONYMOUS mechanism
+ /// </summary>
+ Anonymous,
+
+ /// <summary>
+ /// SASL PLAIN mechanism: username and password
+ /// </summary>
+ Plain
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs
new file mode 100644
index 0000000000..5d88afb88f
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+
+ /// <summary>
+ /// Specifies the types of trasport-level and message-level security used by
+ /// an endpoint configured with an AmqpBinding.
+ /// </summary>
+ public sealed class AmqpSecurity
+ {
+ private AmqpSecurityMode mode;
+ private AmqpTransportSecurity transportSecurity;
+
+ internal AmqpSecurity()
+ {
+ this.mode = AmqpSecurityMode.None;
+ }
+
+ internal AmqpSecurity(AmqpTransportSecurity tsec)
+ {
+ if (tsec == null)
+ {
+ throw new ArgumentNullException("AmqpTransportSecurity");
+ }
+
+ this.mode = AmqpSecurityMode.Transport;
+ this.transportSecurity = tsec;
+ }
+
+ /// <summary>
+ /// gets or sets the security mode.
+ /// </summary>
+ public AmqpSecurityMode Mode
+ {
+ get { return this.mode; }
+ set {this.mode = value; }
+ }
+
+ /// <summary>
+ /// gets the security object that controls encryption
+ /// and authentication parameters for the AMQP transport.
+ /// </summary>
+ public AmqpTransportSecurity Transport
+ {
+ get
+ {
+ if (this.transportSecurity == null)
+ {
+ this.transportSecurity = new AmqpTransportSecurity();
+ }
+
+ return this.transportSecurity;
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
index 7993252309..a98f361d19 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
@@ -29,7 +29,8 @@ namespace Apache.Qpid.Channel
{
AmqpChannelProperties channelProperties;
bool shared;
- int prefetchLimit;
+ int prefetchLimit;
+ AmqpSecurity bindingSecurity;
public AmqpTransportBindingElement()
{
@@ -43,6 +44,13 @@ namespace Apache.Qpid.Channel
this.channelProperties = other.channelProperties.Clone();
this.shared = other.shared;
this.prefetchLimit = other.prefetchLimit;
+ this.bindingSecurity = other.bindingSecurity;
+ }
+
+ internal AmqpSecurity BindingSecurity
+ {
+ get { return this.bindingSecurity; }
+ set { this.bindingSecurity = value; }
}
public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
@@ -88,6 +96,12 @@ namespace Apache.Qpid.Channel
get { return channelProperties; }
}
+ public AmqpCredential AmqpCredential
+ {
+ get { return this.channelProperties.AmqpCredential; }
+ set { this.channelProperties.AmqpCredential = value; }
+ }
+
public string BrokerHost
{
get { return this.channelProperties.BrokerHost; }
@@ -123,6 +137,20 @@ namespace Apache.Qpid.Channel
set { this.channelProperties.TransferMode = value; }
}
+ public AmqpTransportSecurity TransportSecurity
+ {
+ get
+ {
+ if (this.channelProperties.AmqpTransportSecurity == null)
+ {
+ this.channelProperties.AmqpTransportSecurity = new AmqpTransportSecurity();
+ }
+
+ return this.channelProperties.AmqpTransportSecurity;
+ }
+ }
+
+
public AmqpProperties DefaultMessageProperties
{
get { return this.channelProperties.DefaultMessageProperties; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs
new file mode 100644
index 0000000000..41c36c7bcd
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs
@@ -0,0 +1,108 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ /// <summary>
+ /// This class is used by the AMQP Transport to set transport-level security settings for a binding
+ /// </summary>
+ public sealed class AmqpTransportSecurity
+ {
+ private AmqpCredentialType credentialType;
+
+ // WCF frowns on unencrypted credentials on the wire, but AMQP is agnostic.
+ // For interoperability, allow SSL to be turned on/off independentaly.
+ private bool useSSL;
+
+ // Allow per channel credentials, but also ease the common case where
+ // credentials are shared and wish to be globally set in a config file.
+ private AmqpCredential defaultCredential;
+
+ // if true, do not look at context for ServiceModel.Description.ClientCredentials.
+ // ClientCredentials will be place of choice for WCF traditionalists
+ // to specify auth tokens to the AMQP server when Windows and SASL tokens
+ // look the same. At other times it makes no sense and sometimes it is
+ // confusing with Message-level credentials.
+ private bool ignoreEndpointClientCredentials;
+
+
+ internal AmqpTransportSecurity()
+ {
+ this.credentialType = AmqpCredentialType.Anonymous;
+ this.useSSL = true;
+ }
+
+ /// <summary>
+ /// gets or sets the SASL mechanism for AMQP authentication between client and server.
+ /// </summary>
+ public AmqpCredentialType CredentialType
+ {
+ get { return this.credentialType; }
+
+ set { this.credentialType = value; }
+ }
+
+ /// <summary>
+ /// gets or sets the flag that controls the use of SSL encryption
+ /// over the network connection.
+ /// </summary>
+ public bool UseSSL
+ {
+ get { return this.useSSL; }
+ set { this.useSSL = value; }
+ }
+
+ /// <summary>
+ /// gets the default credential object for authentication with the AMQP server.
+ /// </summary>
+ public AmqpCredential DefaultCredential
+ {
+ get
+ {
+ if (this.defaultCredential == null)
+ {
+ this.defaultCredential = new AmqpCredential("", "");
+ }
+
+ return this.defaultCredential;
+ }
+ }
+
+ /// <summary>
+ /// gets or sets the endpoint ClientCredentials search parameter. If true,
+ /// only AmqpCredential objects are searched for in the surrounding context.
+ /// </summary>
+ public bool IgnoreEndpointClientCredentials
+ {
+ get { return this.ignoreEndpointClientCredentials; }
+ set { this.ignoreEndpointClientCredentials = value; }
+ }
+
+ internal AmqpTransportSecurity Clone()
+ {
+ AmqpTransportSecurity sec = (AmqpTransportSecurity)this.MemberwiseClone();
+ if (this.defaultCredential != null)
+ {
+ sec.defaultCredential = this.defaultCredential.Clone();
+ }
+
+ return sec;
+ }
+ }
+}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
index ac90fb7d64..dfa41c9417 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
+++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
@@ -64,6 +64,10 @@ under the License.
<Compile Include="AmqpBinaryBinding.cs" />
<Compile Include="AmqpBinaryBindingCollectionElement.cs" />
<Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+ <Compile Include="AmqpCredential.cs" />
+ <Compile Include="AmqpCredentialType.cs" />
+ <Compile Include="AmqpSecurity.cs" />
+ <Compile Include="AmqpSecurityMode.cs" />
<Compile Include="AmqpChannelFactory.cs" />
<Compile Include="AmqpChannelHelpers.cs" />
<Compile Include="AmqpChannelListener.cs" />
@@ -72,6 +76,7 @@ under the License.
<Compile Include="AmqpBindingConfigurationElement.cs" />
<Compile Include="AmqpTransportBindingElement.cs" />
<Compile Include="AmqpTransportChannel.cs" />
+ <Compile Include="AmqpTransportSecurity.cs" />
<Compile Include="ConnectionManager.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RawMessage.cs" />
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
index a63e5333f4..7238ff2120 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
@@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel
using System;
using System.Collections;
using System.Collections.Generic;
+ using System.Text;
using System.Threading;
using Apache.Qpid.Interop;
@@ -61,7 +62,38 @@ namespace Apache.Qpid.Channel
private static string MakeKey(AmqpChannelProperties props)
{
- return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode;
+ StringBuilder sb = new StringBuilder();
+ sb.Append(props.BrokerHost);
+ sb.Append(':');
+ sb.Append(props.BrokerPort);
+ sb.Append(':');
+ sb.Append(props.TransferMode);
+
+ AmqpTransportSecurity sec = props.AmqpTransportSecurity;
+ if (sec == null)
+ {
+ return sb.ToString();
+ }
+
+ if (sec.UseSSL)
+ {
+ sb.Append(":SSL");
+ }
+
+ if (sec.CredentialType == AmqpCredentialType.Plain)
+ {
+ sb.Append(":saslP");
+ AmqpCredential cred = props.AmqpCredential;
+ if (cred != null)
+ {
+ sb.Append(":NM:");
+ sb.Append(cred.UserName);
+ sb.Append(":PW:");
+ sb.Append(cred.Password);
+ }
+ }
+
+ return sb.ToString();
}
private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing)
@@ -165,7 +197,38 @@ namespace Apache.Qpid.Channel
if (connection == null)
{
- connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort);
+ if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None)
+ {
+ string user = null;
+ string passwd = null;
+ bool ssl = false;
+ bool saslPlain = false;
+
+ AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity;
+ if (tsec.UseSSL)
+ {
+ ssl = true;
+ }
+
+ if (tsec.CredentialType == AmqpCredentialType.Plain)
+ {
+ saslPlain = true;
+ AmqpCredential plainCred = channelProperties.AmqpCredential;
+ if (plainCred != null)
+ {
+ user = plainCred.UserName;
+ passwd = plainCred.Password;
+ }
+ }
+
+ connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort,
+ ssl, saslPlain, user, passwd);
+ }
+ else
+ {
+ connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort);
+ }
+
newConnection = true;
if (this.shared)
{
diff --git a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
index f9d8bd8521..33d125e3c6 100644
--- a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
+++ b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
@@ -31,7 +31,7 @@
// registers the Qpid resource manager with DTC, the plugin is loaded and a successful
// connection via xa_open is confirmed before completing registration and saving the DSN
// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to
-// restablish a new connection with the broker and perform recovery.
+// re-establish a new connection with the broker and perform recovery.
//
// Because this plugin is not involved in coordinating any active transactions it only needs to
// partially implement the XA interface.
@@ -71,12 +71,19 @@ private:
bool active;
std::string host;
int port;
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
int rmid;
std::vector<qpid::framing::Xid> inDoubtXids;
// current scan position, or -1 if no scan
int cursor;
public:
- ResourceManager(int id, std::string h, int p) : rmid(id), host(h), port(p), active(false), cursor(-1) {}
+ ResourceManager(int id, std::string h, int p, bool sslP, bool saslPlainP, std::string uname, std::string pass)
+ : rmid(id), host(h), port(p), ssl(sslP), saslPlain(saslPlainP), username(uname), password(pass),
+ active(false), cursor(-1) {}
~ResourceManager() {}
INT open();
INT close();
@@ -94,6 +101,7 @@ bool memLocked = false;
#define QPIDHMCHARS 512
+
void pinDll() {
if (!memLocked) {
char thisDllName[QPIDHMCHARS];
@@ -141,16 +149,53 @@ void QpidToXa(Xid &qpidXid, XID &winXid) {
}
-/* parse string from AmqpConnection.h
+static char *dsnHeader = "QPIDdsnV2";
- this info will eventually include authentication tokens
+const char* nextDot(const char *p) {
+ while (*p && (*p != '.'))
+ p++;
+ return p;
+}
- dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host,
- System::Diagnostics::Process::GetCurrentProcess()->Id,
- AppDomain::CurrentDomain->Id);
-*/
+int getHexChar (char c) {
+ if ((c >= '0') && (c <= '9'))
+ return c - '0';
+
+ if ((c >= 'a') && (c <= 'f'))
+ return 10 + (c - 'a');
+
+ if ((c >= 'A') && (c <= 'F'))
+ return 10 + (c - 'A');
+
+ return -1;
+}
+
+bool parseFromHex(const char* start, const char* end, std::string& target)
+{
+ const char *p = start;
+
+ while ((p + 1) < end) {
+ int nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ int byte = (nibble << 4);
+ nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ byte += nibble;
+ target.append (1, (char) byte & 0xFF);
+ }
+ return (p == end);
+}
-bool parseDsn (const char *dsn, std::string& host, int& port) {
+
+// parse string from AmqpConnection::DataSourcename
+// "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+//
+// parse strictly and return false if the dsn is in a bad format
+
+bool parseDsn (const char *dsn, std::string& host, int& port, bool& ssl, bool& saslPlain,
+ std::string& username, std::string& password) {
if (dsn == NULL)
return false;
@@ -158,45 +203,125 @@ bool parseDsn (const char *dsn, std::string& host, int& port) {
if (len > 1024)
return false;
- int firstDot = 0;
- for (int i = 0; i < len; i++)
- if (dsn[i] == '.') {
- firstDot = i;
- break;
- }
- if (!firstDot)
+ if (strncmp(dsn, dsnHeader, strlen(dsnHeader)))
return false;
- // look for 2 dots side by side to indicate end of the host
- int doubleDot = 0;
- for (int i = firstDot + 1; i < (len - 1); i++)
- if ((dsn[i] == '.') && (dsn[i+1] == '.')) {
- doubleDot = i;
- break;
- }
- if (!doubleDot)
+ const char *endp = dsn + len;
+ const char *tokenp = dsn + strlen(dsnHeader);
+ if (*tokenp != '.')
+ return false;
+
+ // port
+ tokenp++;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null port not allowed
+
+ const char *token_end = nextDot(tokenp);
+ if ((token_end - tokenp) > 5)
return false;
port = 0;
- for (int i = 0; i < firstDot; i++) {
- char c = dsn[i];
- if ((c < '0') || (c > '9'))
+ for (const char *p = tokenp; p < token_end; p++) {
+ if ((*p < '0') || (*p > '9'))
+ return false;
+ port = (10 * port) + (*p - '0');
+ }
+
+ if (port > 65535)
+ return false;
+
+ // host
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null host not allowed
+
+ token_end = nextDot(tokenp);
+ if (!parseFromHex(tokenp, token_end, host))
+ return false;
+
+ // skip the RM identifier, but verify it exists
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if ((token_end - tokenp) < 3)
+ return false;
+
+ // ssl: look for T or F
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == 'T')
+ ssl = true;
+ else if (*tokenp == 'F')
+ ssl = false;
+ else
+ return false;
+ if (*++tokenp != '.')
+ return false;
+
+ // sasl mechanism: A = anonymous, P = plain. More to come...
+ ++tokenp;
+ if (tokenp >= endp)
+ return false;
+ if (*(tokenp+1) != '.')
+ return false;
+
+ if (*tokenp == 'A') {
+ saslPlain = false;
+ tokenp += 2;
+ // no auth tokens
+ }
+ else if (*tokenp == 'P') {
+ saslPlain = true;
+ tokenp += 2;
+ if (tokenp >= endp)
return false;
- port = (10 * port) + (c - '0');
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, username))
+ return false;
+ tokenp = token_end + 1;
+
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, password))
+ return false;
+ tokenp = token_end + 1;
}
+ else
+ return false;
- host.assign(dsn + firstDot + 1, (doubleDot - firstDot) - 1);
- return true;
+ return (tokenp == endp);
}
+
INT ResourceManager::open() {
INT rv = XAER_RMERR; // placeholder until we successfully connect to resource
active = true;
LeaveCriticalSection(&rmLock);
try {
- qpidConnection.open(host, port);
+ ConnectionSettings settings;
+ settings.host = this->host;
+ settings.port = this->port;
+
+
+ if (ssl)
+ settings.protocol = "ssl";
+
+ if (saslPlain) {
+ settings.username = this->username;
+ settings.password = this->password;
+ settings.mechanism = "PLAIN";
+ }
+
+ qpidConnection.open(settings);
qpidSession = qpidConnection.newSession();
rv = XA_OK;
/*
@@ -359,7 +484,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) {
if (nXids > 0) {
StructHelper decoder;
Xid qpidXid;
- for (int i = 0; i < nXids; i++) {
+ for (size_t i = 0; i < nXids; i++) {
decoder.decode (qpidXid, wireFormatXids[i]);
inDoubtXids.push_back(qpidXid);
}
@@ -369,7 +494,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) {
// make sure none are too big, just in case
- for (int i = 0; i < nXids; i++) {
+ for (size_t i = 0; i < nXids; i++) {
Xid& xid = inDoubtXids[i];
size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0;
size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0;
@@ -449,10 +574,15 @@ INT __cdecl xa_open (char *xa_info, int rmid, long flags) {
else {
std::string brokerHost;
int brokerPort;
- if (parseDsn(xa_info, brokerHost, brokerPort)) {
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
+ if (parseDsn(xa_info, brokerHost, brokerPort, ssl, saslPlain, username, password)) {
try {
- rmp = new ResourceManager(rmid, brokerHost, brokerPort);
+ rmp = new ResourceManager(rmid, brokerHost, brokerPort, ssl, saslPlain, username, password);
rv = rmp->open();
if (rv != XA_OK) {
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
index c3afdf2280..1bc9a15d92 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
@@ -47,10 +47,9 @@ using namespace qpid::client;
using namespace std;
-// Note on locks: Use "this" for fast counting and idle/busy
+// Note on locks: Use thisLock for fast counting and idle/busy
// notifications. Use the "sessions" list to serialize session
// creation/reaping and overall tear down.
-// TODO: switch "this" lock to separate non-visible Object.
AmqpConnection::AmqpConnection(String^ server, int port) :
@@ -58,19 +57,65 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
busyCount(0),
disposed(false)
{
+ initialize (server, port, false, false, nullptr, nullptr);
+}
+
+AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ initialize (server, port, ssl, saslPlain, username, password);
+}
+
+void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password)
+{
+ if (server == nullptr)
+ throw gcnew ArgumentNullException("AMQP server");
+ if (saslPlain) {
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("username");
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("password");
+ }
+
bool success = false;
System::Exception^ openException = nullptr;
sessions = gcnew Collections::Generic::List<AmqpSession^>();
+ thisLock = gcnew Object();
try {
connectionp = new Connection;
- connectionp->open (QpidMarshal::ToNative(server), port);
+
+ if (ssl || saslPlain) {
+ ConnectionSettings proposedSettings;
+ proposedSettings.host = QpidMarshal::ToNative(server);
+ proposedSettings.port = port;
+ if (ssl)
+ proposedSettings.protocol = "ssl";
+
+ if (saslPlain) {
+ proposedSettings.username = QpidMarshal::ToNative(username);
+ proposedSettings.password = QpidMarshal::ToNative(password);
+ proposedSettings.mechanism = "PLAIN";
+ }
+
+ connectionp->open (proposedSettings);
+ }
+ else {
+ connectionp->open (QpidMarshal::ToNative(server), port);
+ }
+
// TODO: registerFailureCallback for failover
success = true;
const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
this->maxFrameSize = settings.maxFrameSize;
this->host = server;
this->port = port;
+ this->ssl = ssl;
+ this->saslPlain = saslPlain;
+ this->username = username;
+ this->password = password;
this->isOpen = true;
} catch (const qpid::Exception& error) {
String^ errmsg = gcnew String(error.what());
@@ -89,7 +134,7 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
AmqpConnection^ AmqpConnection::Clone() {
if (disposed)
throw gcnew ObjectDisposedException("AmqpConnection.Clone");
- return gcnew AmqpConnection (this->host, this->port);
+ return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password);
}
void AmqpConnection::Cleanup()
@@ -153,7 +198,7 @@ void AmqpConnection::NotifyBusy()
{
bool changed = false;
{
- lock l(this);
+ lock l(thisLock);
if (busyCount++ == 0)
changed = true;
}
@@ -166,7 +211,7 @@ void AmqpConnection::NotifyIdle()
{
bool connectionIdle = false;
{
- lock l(this);
+ lock l(thisLock);
if (--busyCount == 0)
connectionIdle = true;
}
@@ -175,5 +220,57 @@ void AmqpConnection::NotifyIdle()
}
}
+void HexAppend(StringBuilder^ sb, String^ s) {
+ if (s->Length > 0) {
+ array<unsigned char>^ bytes = Encoding::UTF8->GetBytes(s);
+ for each (unsigned char b in bytes) {
+ sb->Append(String::Format("{0:x2}", b));
+ }
+ }
+ sb->Append(".");
+}
+
+
+// Note: any change to this format has to be reflected in the DTC plugin's xa_open()
+// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+// This extended info is needed so that the DTC can make a separate connection to the broker
+// for recovery.
+
+String^ AmqpConnection::DataSourceName::get() {
+ if (dataSourceName == nullptr) {
+ StringBuilder^ sb = gcnew StringBuilder();
+ sb->Append("QPIDdsnV2.");
+
+ sb->Append(this->port);
+ sb->Append(".");
+
+ HexAppend(sb, this->host);
+
+ sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id);
+ sb->Append("-");
+ sb->Append(AppDomain::CurrentDomain->Id);
+ sb->Append(".");
+
+ if (this->ssl)
+ sb->Append("T");
+ else
+ sb->Append("F");
+ sb->Append(".");
+
+ if (this->saslPlain) {
+ sb->Append("P.");
+ HexAppend(sb, this->username);
+ HexAppend(sb, this->password);
+ }
+ else {
+ // SASL anonymous
+ sb->Append("A.");
+ }
+
+ dataSourceName = sb->ToString();
+ }
+ return dataSourceName;
+}
+
}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
index 6533185fa1..ef4d0e3f37 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
@@ -36,17 +36,26 @@ public ref class AmqpConnection
{
private:
Connection* connectionp;
- String^ host;
- int port;
bool disposed;
Collections::Generic::List<AmqpSession^>^ sessions;
bool isOpen;
int busyCount;
int maxFrameSize;
DtxResourceManager^ dtxResourceManager;
- void Cleanup();
// unique string used for distributed transactions
String^ dataSourceName;
+ Object ^thisLock;
+
+ // properties needed to allow DTC to do transactions (see DataSourceName
+ String^ host;
+ int port;
+ bool ssl;
+ bool saslPlain;
+ String^ username;
+ String^ password;
+
+ void Cleanup();
+ void initialize (System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password);
internal:
void NotifyBusy();
@@ -63,19 +72,12 @@ private:
}
property String^ DataSourceName {
- // Note: any change to this format has to be reflected in the DTC plugin's xa_open()
- String^ get() {
- if (dataSourceName == nullptr) {
- dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host,
- System::Diagnostics::Process::GetCurrentProcess()->Id,
- AppDomain::CurrentDomain->Id);
- }
- return dataSourceName;
- }
+ String^ get();
}
public:
AmqpConnection(System::String^ server, int port);
+ AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password);
~AmqpConnection();
!AmqpConnection();
void Close();