summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs329
1 files changed, 329 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
new file mode 100644
index 0000000000..7238ff2120
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs
@@ -0,0 +1,329 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Channel
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Text;
+ using System.Threading;
+
+ using Apache.Qpid.Interop;
+
+ // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession
+ // objects. If two connection requests could be shared (see MakeKey() properties), and
+ // are designated as shareable, then they will be paired up. Each shared connection is
+ // a separate instance of a ManagedConnection. All unshared connections use a single
+ // instance of ManagedConnection with locking turned off. The ManagedConnection object
+ // registers for notifictation when a connection goes idle (all grandchild InputLink and
+ // OutputLink objects have been closed), and closes the connection.
+
+ // TODO: the session sharing is roughed-in via comments but needs completing.
+
+ internal sealed class ConnectionManager
+ {
+ // A side effect of creating InputLinks and OutputLinks is that counters
+ // in the respective AmqpSession and AmqpConnection are updated, so care
+ // must be taken to hold the lock across acquiring a session and opening
+ // a link on it.
+
+ // one for each shared connection
+ private static Dictionary<string, ManagedConnection> sharedInstances;
+
+ // this one creates and releases connections that are not shared. No locking required.
+ private static ManagedConnection unsharedInstance;
+
+ // lock for finding or creating ManagedConnection instances
+ private static Object connectionLock;
+
+ static ConnectionManager()
+ {
+ unsharedInstance = null;
+ sharedInstances = new Dictionary<string, ManagedConnection>();
+ connectionLock = new Object();
+ }
+
+ private static string MakeKey(AmqpChannelProperties props)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.Append(props.BrokerHost);
+ sb.Append(':');
+ sb.Append(props.BrokerPort);
+ sb.Append(':');
+ sb.Append(props.TransferMode);
+
+ AmqpTransportSecurity sec = props.AmqpTransportSecurity;
+ if (sec == null)
+ {
+ return sb.ToString();
+ }
+
+ if (sec.UseSSL)
+ {
+ sb.Append(":SSL");
+ }
+
+ if (sec.CredentialType == AmqpCredentialType.Plain)
+ {
+ sb.Append(":saslP");
+ AmqpCredential cred = props.AmqpCredential;
+ if (cred != null)
+ {
+ sb.Append(":NM:");
+ sb.Append(cred.UserName);
+ sb.Append(":PW:");
+ sb.Append(cred.Password);
+ }
+ }
+
+ return sb.ToString();
+ }
+
+ private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing)
+ {
+ if (connectionSharing)
+ {
+ string key = MakeKey(channelProperties);
+ lock (connectionLock)
+ {
+ ManagedConnection mc = null;
+ if (!sharedInstances.TryGetValue(key, out mc))
+ {
+ mc = new ManagedConnection(true);
+ sharedInstances.Add(key, mc);
+ }
+ return mc;
+ }
+ }
+ else
+ {
+ lock (connectionLock)
+ {
+ if (unsharedInstance == null)
+ {
+ unsharedInstance = new ManagedConnection(false);
+ }
+ return unsharedInstance;
+ }
+ }
+ }
+
+ public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+ {
+ ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+ return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname);
+ }
+
+ public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname)
+ {
+ ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing);
+ return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null);
+ }
+
+
+
+ class ManagedConnection
+ {
+ private Boolean shared;
+ private AmqpConnection sharedConnection;
+ //private Dictionary<string, AmqpSession> sharedSessions;
+
+ public ManagedConnection(bool shared)
+ {
+ this.shared = shared;
+ }
+
+
+ public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue)
+ {
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ Object link = null;
+ bool newConnection = false;
+ //bool newSession = false;
+ bool success = false;
+
+ // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links
+
+ if (this.shared)
+ {
+ Monitor.Enter(this); // lock
+ }
+
+ try
+ {
+ if (this.shared)
+ {
+ // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed
+ if (sessionSharing)
+ {
+ throw new NotImplementedException("shared session");
+ /* * ... once we have a defined shared session config parameter:
+
+ // lazilly create
+ if (this.sharedSessions == null)
+ {
+ this.sharedSessions = new Dictionary<string, AmqpSession>();
+ }
+
+ alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here;
+ this.sharedSessions.TryGetValue(sessionKey, out session);
+
+ * */
+ }
+
+ if (this.sharedConnection != null)
+ {
+ connection = this.sharedConnection;
+ }
+ }
+
+ if (connection == null)
+ {
+ if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None)
+ {
+ string user = null;
+ string passwd = null;
+ bool ssl = false;
+ bool saslPlain = false;
+
+ AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity;
+ if (tsec.UseSSL)
+ {
+ ssl = true;
+ }
+
+ if (tsec.CredentialType == AmqpCredentialType.Plain)
+ {
+ saslPlain = true;
+ AmqpCredential plainCred = channelProperties.AmqpCredential;
+ if (plainCred != null)
+ {
+ user = plainCred.UserName;
+ passwd = plainCred.Password;
+ }
+ }
+
+ connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort,
+ ssl, saslPlain, user, passwd);
+ }
+ else
+ {
+ connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort);
+ }
+
+ newConnection = true;
+ if (this.shared)
+ {
+ connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler);
+ }
+ else
+ {
+ connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler);
+ }
+ }
+
+ if (session == null)
+ {
+ session = connection.CreateSession();
+ //newSession = true;
+ }
+
+ if (inputQueue != null)
+ {
+ link = session.CreateInputLink(inputQueue);
+ }
+ else
+ {
+ link = session.CreateOutputLink(outputQueue);
+ }
+
+ if (this.shared)
+ {
+ if (newConnection)
+ {
+ this.sharedConnection = connection;
+ }
+ /*
+ if (newSession)
+ {
+ sharedSessions.Add(foo, session);
+ }
+ * */
+ }
+
+ success = true;
+ }
+ finally
+ {
+ if (this.shared)
+ {
+ Monitor.Exit(this);
+ }
+ if (!success)
+ {
+ /*
+ if (newSession)
+ {
+ session.Close();
+ }
+ */
+ if (newConnection)
+ {
+ connection.Close();
+ }
+ }
+ }
+
+ return link;
+ }
+
+
+ static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty)
+ {
+ if (sender is AmqpConnection)
+ {
+ AmqpConnection connection = (AmqpConnection)sender;
+ connection.Close();
+ }
+ }
+
+ void IdleConnectionHandler(Object sender, EventArgs empty)
+ {
+ lock (this)
+ {
+ if (sharedConnection != sender || sharedConnection == null)
+ {
+ return;
+ }
+ if (!sharedConnection.IsIdle)
+ {
+ // Another thread made the connection busy again.
+ // That's OK. Another idle event will come along later.
+ return;
+ }
+ sharedConnection.Close(); // also closes all child sessions
+ sharedConnection = null;
+ //sharedSessions = null;
+ }
+ }
+ }
+ }
+}