diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client')
20 files changed, 1622 insertions, 1622 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs b/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs index 6382eaaf39..7bb64e3fff 100644 --- a/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs +++ b/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs @@ -1,39 +1,39 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.Serialization;
-
-namespace Apache.Qpid.Client
-{
- [Serializable]
- public class AMQAuthenticationException : AMQException
- {
- public AMQAuthenticationException(int error, String message)
- : base(error, message)
- {
- }
-
- protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt)
- : base(info, ctxt)
- {
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQAuthenticationException : AMQException + { + public AMQAuthenticationException(int error, String message) + : base(error, message) + { + } + + protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs index 0d93176734..5c9dd86c53 100644 --- a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs +++ b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs @@ -1,45 +1,45 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.Serialization;
-using Apache.Qpid.Common;
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid.Client
-{
- [Serializable]
- public class AMQNoConsumersException : AMQUndeliveredException
- {
- public AMQNoConsumersException(string message)
- : this(message, null)
- {
- }
-
- public AMQNoConsumersException(string message, object bounced)
- : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
- {
- }
- protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
- : base(info, ctxt)
- {
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using Apache.Qpid.Common; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQNoConsumersException : AMQUndeliveredException + { + public AMQNoConsumersException(string message) + : this(message, null) + { + } + + public AMQNoConsumersException(string message, object bounced) + : base(AMQConstant.NO_CONSUMERS.Code, message, bounced) + { + } + protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs index bde3cdd989..5868d78f32 100644 --- a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs +++ b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs @@ -1,46 +1,46 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.Serialization;
-using Apache.Qpid.Common;
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid.Client
-{
- [Serializable]
- public class AMQNoRouteException : AMQUndeliveredException
- {
- public AMQNoRouteException(string message)
- : this(message, null)
- {
- }
-
- public AMQNoRouteException(string message, object bounced)
- : base(AMQConstant.NO_ROUTE.Code, message, bounced)
- {
- }
-
- protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
- : base(info, ctxt)
- {
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using Apache.Qpid.Common; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQNoRouteException : AMQUndeliveredException + { + public AMQNoRouteException(string message) + : this(message, null) + { + } + + public AMQNoRouteException(string message, object bounced) + : base(AMQConstant.NO_ROUTE.Code, message, bounced) + { + } + + protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs b/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs index ae9225a53a..8d289fa956 100644 --- a/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs +++ b/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs @@ -1,84 +1,84 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Collections;
-using System.Collections.Specialized;
-using System.Configuration;
-using System.Text;
-
-using Apache.Qpid.Client.Security;
-using Apache.Qpid.Sasl.Mechanisms;
-
-namespace Apache.Qpid.Client.Configuration
-{
- public class AuthenticationConfigurationSectionHandler
- : IConfigurationSectionHandler
- {
-
- public object Create(object parent, object configContext, System.Xml.XmlNode section)
- {
- NameValueSectionHandler handler = new NameValueSectionHandler();
- OrderedHashTable schemes = new OrderedHashTable();
-
- NameValueCollection options = (NameValueCollection)
- handler.Create(parent, configContext, section);
-
- if ( options != null )
- {
- foreach ( string key in options.Keys )
- {
- Type type = Type.GetType(options[key]);
- if ( type == null )
- throw new ConfigurationException(string.Format("Type '{0}' not found", key));
- if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) )
- throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key));
-
- schemes.Add(key, type);
- }
- }
-
- return schemes;
- }
-
- } // class AuthenticationConfigurationSectionHandler
-
- public class OrderedHashTable : Hashtable
- {
- private ArrayList _keys = new ArrayList();
-
- public IList OrderedKeys
- {
- get { return _keys; }
- }
-
- public override void Add(object key, object value)
- {
- base.Add(key, value);
- _keys.Add(key);
- }
- public override void Remove(object key)
- {
- base.Remove(key);
- _keys.Remove(key);
- }
- }
-} // namespace Apache.Qpid.Client.Configuration
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Collections.Specialized; +using System.Configuration; +using System.Text; + +using Apache.Qpid.Client.Security; +using Apache.Qpid.Sasl.Mechanisms; + +namespace Apache.Qpid.Client.Configuration +{ + public class AuthenticationConfigurationSectionHandler + : IConfigurationSectionHandler + { + + public object Create(object parent, object configContext, System.Xml.XmlNode section) + { + NameValueSectionHandler handler = new NameValueSectionHandler(); + OrderedHashTable schemes = new OrderedHashTable(); + + NameValueCollection options = (NameValueCollection) + handler.Create(parent, configContext, section); + + if ( options != null ) + { + foreach ( string key in options.Keys ) + { + Type type = Type.GetType(options[key]); + if ( type == null ) + throw new ConfigurationException(string.Format("Type '{0}' not found", key)); + if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) ) + throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key)); + + schemes.Add(key, type); + } + } + + return schemes; + } + + } // class AuthenticationConfigurationSectionHandler + + public class OrderedHashTable : Hashtable + { + private ArrayList _keys = new ArrayList(); + + public IList OrderedKeys + { + get { return _keys; } + } + + public override void Add(object key, object value) + { + base.Add(key, value); + _keys.Add(key); + } + public override void Remove(object key) + { + base.Remove(key); + _keys.Remove(key); + } + } +} // namespace Apache.Qpid.Client.Configuration diff --git a/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs index 7290d758f8..70aa3e1078 100644 --- a/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs @@ -1,44 +1,44 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using log4net;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Client.State;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Handler
-{
- public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
- {
-
- private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
-
- public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
- {
- QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
- if (body != null)
- {
- _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
- }
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class QueueDeleteOkMethodHandler : IStateAwareMethodListener + { + + private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method; + if (body != null) + { + _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount); + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs index 8bde707b00..22db70575d 100644 --- a/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs @@ -1,44 +1,44 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using log4net;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Client.State;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Handler
-{
- public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
- {
-
- private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
-
- public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
- {
- QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
- if (body != null)
- {
- _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
- }
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class QueuePurgeOkMethodHandler : IStateAwareMethodListener + { + + private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method; + if (body != null) + { + _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount); + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs index 6841b46f54..2f23a1571d 100644 --- a/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs +++ b/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs @@ -1,47 +1,47 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Text;
-
-namespace Apache.Qpid.Client.Protocol
-{
- /// <summary>
- /// Default timeout values for the protocol
- /// </summary>
- sealed class DefaultTimeouts
- {
- /// <summary>
- /// Maximum number of milliseconds to wait for a state change
- /// in the protocol's state machine
- /// </summary>
- public const int MaxWaitForState = 30* 1000;
- /// <summary>
- /// Maximum number of milliseconds to wait for a reply
- /// frame when doing synchronous writer to the broker
- /// </summary>
- public const int MaxWaitForSyncWriter = 30 * 1000;
-
- private DefaultTimeouts()
- {
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Text; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// Default timeout values for the protocol + /// </summary> + sealed class DefaultTimeouts + { + /// <summary> + /// Maximum number of milliseconds to wait for a state change + /// in the protocol's state machine + /// </summary> + public const int MaxWaitForState = 30* 1000; + /// <summary> + /// Maximum number of milliseconds to wait for a reply + /// frame when doing synchronous writer to the broker + /// </summary> + public const int MaxWaitForSyncWriter = 30 * 1000; + + private DefaultTimeouts() + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs index 22f1c9d89b..9ac0381850 100644 --- a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs +++ b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs @@ -1,129 +1,129 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Collections;
-using System.Configuration;
-using System.Text;
-using Apache.Qpid.Sasl;
-using Apache.Qpid.Sasl.Mechanisms;
-
-using Apache.Qpid.Client.Configuration;
-
-namespace Apache.Qpid.Client.Security
-{
-
- /// <summary>
- /// Helper class to map SASL mechanisms to our
- /// internal ISaslCallbackHandler implementations.
- /// </summary>
- /// <remarks>
- /// The set of configured callback handlers and their order
- /// controls the selection of the SASL mechanism used for authentication.
- /// <para>
- /// You can either replace the default handler for CRAM-MD5 and PLAIN
- /// authentication (the two default options) using the application
- /// configuration file. Configuration is done by especifying the SASL
- /// mechanism name (e.g PLAIN) and the type implementing the callback handler
- /// used to provide any data required by the mechanism like username and password.
- /// </para>
- /// <para>
- /// Callback handler types should implement the IAMQCallbackHandler interface.
- /// </para>
- /// <para>
- /// New callbacks or authentication mechanisms can be configured like this:
- /// </para>
- /// <example><![CDATA[
- /// <configuration>
- /// <configSections>
- /// <sectionGroup name="qpid.client">
- /// <section name="authentication" type="Apache.Qpid.Client.Configuration.AuthenticationConfigurationSectionHandler, Apache.Qpid.Client"/>
- /// </sectionGroup>
- /// </configSections>
- /// <qpid.client>
- /// <authentication>
- /// <add key="TEST" value="Apache.Qpid.Client.Tests.Security.TestCallbackHandler, Apache.Qpid.Client.Tests"/>
- /// </authentication>
- /// </qpid.client>
- /// </configuration>
- /// ]]></example>
- /// </remarks>
- public sealed class CallbackHandlerRegistry
- {
- private static CallbackHandlerRegistry _instance =
- new CallbackHandlerRegistry();
- private OrderedHashTable _mechanism2HandlerMap;
- private string[] _mechanisms;
-
- public static CallbackHandlerRegistry Instance
- {
- get { return _instance; }
- }
-
- public string[] Mechanisms
- {
- get { return _mechanisms; }
- }
-
- private CallbackHandlerRegistry()
- {
- _mechanism2HandlerMap = (OrderedHashTable)
- ConfigurationSettings.GetConfig("qpid.client/authentication");
-
- // configure default options if not available
- if ( _mechanism2HandlerMap == null )
- _mechanism2HandlerMap = new OrderedHashTable();
-
- if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
- _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
- if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
- _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
- if ( !_mechanism2HandlerMap.Contains(CramMD5HexSaslClient.Mechanism) )
- _mechanism2HandlerMap.Add(CramMD5HexSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
- if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
- _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
-
- _mechanisms = new string[_mechanism2HandlerMap.Count];
- _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0);
- }
-
- public bool IsSupportedMechanism(string mechanism)
- {
- return _mechanism2HandlerMap.Contains(mechanism);
- }
-
- public string ChooseMechanism(string mechanisms)
- {
- IList mechs = mechanisms.Split(' ');
- foreach ( string supportedMech in _mechanisms )
- {
- if ( mechs.Contains(supportedMech) )
- return supportedMech;
- }
- return null;
- }
-
- public Type GetCallbackHandler(string mechanism)
- {
- return (Type)_mechanism2HandlerMap[mechanism];
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Configuration; +using System.Text; +using Apache.Qpid.Sasl; +using Apache.Qpid.Sasl.Mechanisms; + +using Apache.Qpid.Client.Configuration; + +namespace Apache.Qpid.Client.Security +{ + + /// <summary> + /// Helper class to map SASL mechanisms to our + /// internal ISaslCallbackHandler implementations. + /// </summary> + /// <remarks> + /// The set of configured callback handlers and their order + /// controls the selection of the SASL mechanism used for authentication. + /// <para> + /// You can either replace the default handler for CRAM-MD5 and PLAIN + /// authentication (the two default options) using the application + /// configuration file. Configuration is done by especifying the SASL + /// mechanism name (e.g PLAIN) and the type implementing the callback handler + /// used to provide any data required by the mechanism like username and password. + /// </para> + /// <para> + /// Callback handler types should implement the IAMQCallbackHandler interface. + /// </para> + /// <para> + /// New callbacks or authentication mechanisms can be configured like this: + /// </para> + /// <example><![CDATA[ + /// <configuration> + /// <configSections> + /// <sectionGroup name="qpid.client"> + /// <section name="authentication" type="Apache.Qpid.Client.Configuration.AuthenticationConfigurationSectionHandler, Apache.Qpid.Client"/> + /// </sectionGroup> + /// </configSections> + /// <qpid.client> + /// <authentication> + /// <add key="TEST" value="Apache.Qpid.Client.Tests.Security.TestCallbackHandler, Apache.Qpid.Client.Tests"/> + /// </authentication> + /// </qpid.client> + /// </configuration> + /// ]]></example> + /// </remarks> + public sealed class CallbackHandlerRegistry + { + private static CallbackHandlerRegistry _instance = + new CallbackHandlerRegistry(); + private OrderedHashTable _mechanism2HandlerMap; + private string[] _mechanisms; + + public static CallbackHandlerRegistry Instance + { + get { return _instance; } + } + + public string[] Mechanisms + { + get { return _mechanisms; } + } + + private CallbackHandlerRegistry() + { + _mechanism2HandlerMap = (OrderedHashTable) + ConfigurationSettings.GetConfig("qpid.client/authentication"); + + // configure default options if not available + if ( _mechanism2HandlerMap == null ) + _mechanism2HandlerMap = new OrderedHashTable(); + + if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(CramMD5HexSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(CramMD5HexSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + + _mechanisms = new string[_mechanism2HandlerMap.Count]; + _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0); + } + + public bool IsSupportedMechanism(string mechanism) + { + return _mechanism2HandlerMap.Contains(mechanism); + } + + public string ChooseMechanism(string mechanisms) + { + IList mechs = mechanisms.Split(' '); + foreach ( string supportedMech in _mechanisms ) + { + if ( mechs.Contains(supportedMech) ) + return supportedMech; + } + return null; + } + + public Type GetCallbackHandler(string mechanism) + { + return (Type)_mechanism2HandlerMap[mechanism]; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs b/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs index 2560c1d96b..6ff45be04a 100644 --- a/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs +++ b/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs @@ -1,35 +1,35 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Text;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Sasl;
-
-namespace Apache.Qpid.Client.Security
-{
- public interface IAMQCallbackHandler : ISaslCallbackHandler
- {
- void Initialize(AMQProtocolSession session);
- }
-
-}
-
-
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Security +{ + public interface IAMQCallbackHandler : ISaslCallbackHandler + { + void Initialize(AMQProtocolSession session); + } + +} + + diff --git a/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs b/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs index 489d4d1665..743ade77c9 100644 --- a/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs +++ b/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs @@ -1,56 +1,56 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Collections;
-using System.Text;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Sasl;
-
-namespace Apache.Qpid.Client.Security
-{
- internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler
- {
- private AMQProtocolSession _session;
-
- public void Initialize(AMQProtocolSession session)
- {
- if ( session == null )
- throw new ArgumentNullException("session");
-
- _session = session;
- }
-
- public void Handle(ISaslCallback[] callbacks)
- {
- foreach ( ISaslCallback cb in callbacks )
- {
- if ( cb is NameCallback )
- {
- ((NameCallback)cb).Text = _session.Username;
- } else if ( cb is PasswordCallback )
- {
- ((PasswordCallback)cb).Text = _session.Password;
- }
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Text; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Security +{ + internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler + { + private AMQProtocolSession _session; + + public void Initialize(AMQProtocolSession session) + { + if ( session == null ) + throw new ArgumentNullException("session"); + + _session = session; + } + + public void Handle(ISaslCallback[] callbacks) + { + foreach ( ISaslCallback cb in callbacks ) + { + if ( cb is NameCallback ) + { + ((NameCallback)cb).Text = _session.Username; + } else if ( cb is PasswordCallback ) + { + ((PasswordCallback)cb).Text = _session.Password; + } + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/SslOptions.cs b/dotnet/Qpid.Client/Client/SslOptions.cs index d637101000..4630121828 100644 --- a/dotnet/Qpid.Client/Client/SslOptions.cs +++ b/dotnet/Qpid.Client/Client/SslOptions.cs @@ -1,81 +1,81 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Security.Cryptography.X509Certificates;
-
-namespace Apache.Qpid.Client
-{
- /// <summary>
- /// Configures SSL-related options to connect to an AMQP broker.
- /// </summary>
- /// <remarks>
- /// If the server certificate is not trusted by the client,
- /// connection will fail. However, you can set the
- /// <see cref="IgnoreValidationErrors"/> property to true
- /// to ignore any certificate verification errors for debugging purposes.
- /// </remarks>
- public class SslOptions
- {
- private X509Certificate _clientCertificate;
- private bool _ignoreValidationErrors;
-
- /// <summary>
- /// Certificate to present to the broker to authenticate
- /// this client connection
- /// </summary>
- public X509Certificate ClientCertificate
- {
- get { return _clientCertificate; }
- }
-
- /// <summary>
- /// If true, the validity of the broker certificate
- /// will not be verified on connection
- /// </summary>
- public bool IgnoreValidationErrors
- {
- get { return _ignoreValidationErrors; }
- }
-
- /// <summary>
- /// Initialize a new instance with default values
- /// (No client certificate, don't ignore validation errors)
- /// </summary>
- public SslOptions()
- {
- }
-
- /// <summary>
- /// Initialize a new instance
- /// </summary>
- /// <param name="clientCertificate">
- /// Certificate to use to authenticate the client to the broker
- /// </param>
- /// <param name="ignoreValidationErrors">
- /// If true, ignore any validation errors when validating the server certificate
- /// </param>
- public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
- {
- _clientCertificate = clientCertificate;
- _ignoreValidationErrors = ignoreValidationErrors;
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Security.Cryptography.X509Certificates; + +namespace Apache.Qpid.Client +{ + /// <summary> + /// Configures SSL-related options to connect to an AMQP broker. + /// </summary> + /// <remarks> + /// If the server certificate is not trusted by the client, + /// connection will fail. However, you can set the + /// <see cref="IgnoreValidationErrors"/> property to true + /// to ignore any certificate verification errors for debugging purposes. + /// </remarks> + public class SslOptions + { + private X509Certificate _clientCertificate; + private bool _ignoreValidationErrors; + + /// <summary> + /// Certificate to present to the broker to authenticate + /// this client connection + /// </summary> + public X509Certificate ClientCertificate + { + get { return _clientCertificate; } + } + + /// <summary> + /// If true, the validity of the broker certificate + /// will not be verified on connection + /// </summary> + public bool IgnoreValidationErrors + { + get { return _ignoreValidationErrors; } + } + + /// <summary> + /// Initialize a new instance with default values + /// (No client certificate, don't ignore validation errors) + /// </summary> + public SslOptions() + { + } + + /// <summary> + /// Initialize a new instance + /// </summary> + /// <param name="clientCertificate"> + /// Certificate to use to authenticate the client to the broker + /// </param> + /// <param name="ignoreValidationErrors"> + /// If true, ignore any validation errors when validating the server certificate + /// </param> + public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors) + { + _clientCertificate = clientCertificate; + _ignoreValidationErrors = ignoreValidationErrors; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs index 7195b3ab04..e0e890fc5a 100644 --- a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs +++ b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs @@ -1,38 +1,38 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System.IO;
-
-namespace Apache.Qpid.Client.Transport
-{
- /// <summary>
- /// Defines a way to introduce an arbitrary filtering
- /// stream into the stream chain managed by <see cref="IoHandler"/>
- /// </summary>
- public interface IStreamFilter
- {
- /// <summary>
- /// Creates a new filtering stream on top of another
- /// </summary>
- /// <param name="lowerStream">Next stream on the stack</param>
- /// <returns>A new filtering stream</returns>
- Stream CreateFilterStream(Stream lowerStream);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; + +namespace Apache.Qpid.Client.Transport +{ + /// <summary> + /// Defines a way to introduce an arbitrary filtering + /// stream into the stream chain managed by <see cref="IoHandler"/> + /// </summary> + public interface IStreamFilter + { + /// <summary> + /// Creates a new filtering stream on top of another + /// </summary> + /// <param name="lowerStream">Next stream on the stack</param> + /// <returns>A new filtering stream</returns> + Stream CreateFilterStream(Stream lowerStream); + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs index 9ac513069e..0475236d92 100644 --- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs +++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -1,322 +1,322 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.IO;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Client.Protocol;
-
-namespace Apache.Qpid.Client.Transport
-{
- /// <summary>
- /// Responsible for reading and writing
- /// ByteBuffers from/to network streams, and handling
- /// the stream filters
- /// </summary>
- public class IoHandler : IByteChannel, IDisposable
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
- private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- private Stream _topStream;
- private IProtocolListener _protocolListener;
- private int _readBufferSize;
-
- public int ReadBufferSize
- {
- get { return _readBufferSize; }
- set { _readBufferSize = value; }
- }
-
- /// <summary>
- /// Initialize a new instance
- /// </summary>
- /// <param name="stream">Underlying network stream</param>
- /// <param name="protocolListener">Protocol listener to report exceptions to</param>
- public IoHandler(Stream stream, IProtocolListener protocolListener)
- {
- if ( stream == null )
- throw new ArgumentNullException("stream");
- if ( protocolListener == null )
- throw new ArgumentNullException("protocolListener");
-
- // initially, the stream at the top of the filter
- // chain is the underlying network stream
- _topStream = stream;
- _protocolListener = protocolListener;
- _readBufferSize = DEFAULT_BUFFER_SIZE;
- }
-
- /// <summary>
- /// Adds a new filter on the top of the chain
- /// </summary>
- /// <param name="filter">Stream filter to put on top of the chain</param>
- /// <remarks>
- /// This should *only* be called during initialization. We don't
- /// support changing the filter change after the first read/write
- /// has been done and it's not thread-safe to boot!
- /// </remarks>
- public void AddFilter(IStreamFilter filter)
- {
- _topStream = filter.CreateFilterStream(_topStream);
- }
-
- #region IByteChannel Implementation
- //
- // IByteChannel Implementation
- //
-
- /// <summary>
- /// Read a <see cref="ByteBuffer"/> from the underlying
- /// network stream and any configured filters
- /// </summary>
- /// <returns>A ByteBuffer, if available</returns>
- public ByteBuffer Read()
- {
- byte[] bytes = AllocateBuffer();
-
- int numOctets = _topStream.Read(bytes, 0, bytes.Length);
-
- return WrapByteArray(bytes, numOctets);
- }
-
- /// <summary>
- /// Begin an asynchronous read operation
- /// </summary>
- /// <param name="callback">Callback method to call when read operation completes</param>
- /// <param name="state">State object</param>
- /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
- public IAsyncResult BeginRead(AsyncCallback callback, object state)
- {
- byte[] bytes = AllocateBuffer();
- ReadData rd = new ReadData(callback, state, bytes);
-
- // only put a callback if the caller wants one.
- AsyncCallback myCallback = null;
- if ( callback != null )
- myCallback = new AsyncCallback(OnAsyncReadDone);
-
- IAsyncResult result = _topStream.BeginRead(
- bytes, 0, bytes.Length, myCallback,rd
- );
- return new WrappedAsyncResult(result, bytes);
- }
-
- /// <summary>
- /// End an asynchronous read operation
- /// </summary>
- /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
- /// <returns>The <see cref="ByteBuffer"/> read</returns>
- public ByteBuffer EndRead(IAsyncResult result)
- {
- WrappedAsyncResult theResult = (WrappedAsyncResult)result;
- int bytesRead = _topStream.EndRead(theResult.InnerResult);
- return WrapByteArray(theResult.Buffer, bytesRead);
- }
-
- /// <summary>
- /// Write a <see cref="ByteBuffer"/> to the underlying network
- /// stream, going through any configured filters
- /// </summary>
- /// <param name="buffer"></param>
- public void Write(ByteBuffer buffer)
- {
- try
- {
- _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Warn("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- /// <summary>
- /// Begin an asynchronous write operation
- /// </summary>
- /// <param name="buffer">Buffer to write</param>
- /// <param name="callback">A callback to call when the operation completes</param>
- /// <param name="state">State object</param>
- /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
- public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
- {
- try
- {
- return _topStream.BeginWrite(
- buffer.Array, buffer.Position, buffer.Limit,
- callback, state
- );
- } catch ( Exception e )
- {
- _log.Error("BeginWrite caused exception", e);
- // not clear if an exception here should be propagated? we still
- // need to propagate it upwards anyway!
- _protocolListener.OnException(e);
- throw;
- }
- }
-
- /// <summary>
- /// End an asynchronous write operation
- /// </summary>
- /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
- public void EndWrite(IAsyncResult result)
- {
- try
- {
- _topStream.EndWrite(result);
- } catch ( Exception e )
- {
- _log.Error("EndWrite caused exception", e);
- // not clear if an exception here should be propagated?
- _protocolListener.OnException(e);
- //throw;
- }
- }
- #endregion // IByteChannel Implementation
-
- #region IDisposable Implementation
- //
- // IDisposable Implementation
- //
-
- public void Dispose()
- {
- if ( _topStream != null )
- {
- _topStream.Close();
- }
- }
-
- #endregion // IDisposable Implementation
-
- #region Private and Helper Classes/Methods
- //
- // Private and Helper Classes/Methods
- //
-
- private byte[] AllocateBuffer()
- {
- return new byte[ReadBufferSize];
- }
-
- private static ByteBuffer WrapByteArray(byte[] bytes, int size)
- {
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = size;
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
-
- private static void OnAsyncReadDone(IAsyncResult result)
- {
- ReadData rd = (ReadData) result.AsyncState;
- IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
- rd.Callback(wrapped);
- }
-
- class ReadData
- {
- private object _state;
- private AsyncCallback _callback;
- private byte[] _buffer;
-
- public object State
- {
- get { return _state; }
- }
-
- public AsyncCallback Callback
- {
- get { return _callback; }
- }
-
- public byte[] Buffer
- {
- get { return _buffer; }
- }
-
- public ReadData(AsyncCallback callback, object state, byte[] buffer)
- {
- _callback = callback;
- _state = state;
- _buffer = buffer;
- }
- }
-
- class WrappedAsyncResult : IAsyncResult
- {
- private IAsyncResult _innerResult;
- private byte[] _buffer;
-
- #region IAsyncResult Properties
- //
- // IAsyncResult Properties
- //
- public bool IsCompleted
- {
- get { return _innerResult.IsCompleted; }
- }
-
- public WaitHandle AsyncWaitHandle
- {
- get { return _innerResult.AsyncWaitHandle; }
- }
-
- public object AsyncState
- {
- get { return _innerResult.AsyncState; }
- }
-
- public bool CompletedSynchronously
- {
- get { return _innerResult.CompletedSynchronously; }
- }
- #endregion // IAsyncResult Properties
-
- public IAsyncResult InnerResult
- {
- get { return _innerResult; }
- }
- public byte[] Buffer
- {
- get { return _buffer; }
- }
-
- public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
- {
- if ( result == null )
- throw new ArgumentNullException("result");
- if ( buffer == null )
- throw new ArgumentNullException("buffer");
-
- _innerResult = result;
- _buffer = buffer;
- }
- }
-
- #endregion // Private and Helper Classes/Methods
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.Transport +{ + /// <summary> + /// Responsible for reading and writing + /// ByteBuffers from/to network streams, and handling + /// the stream filters + /// </summary> + public class IoHandler : IByteChannel, IDisposable + { + private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler)); + private const int DEFAULT_BUFFER_SIZE = 32 * 1024; + + private Stream _topStream; + private IProtocolListener _protocolListener; + private int _readBufferSize; + + public int ReadBufferSize + { + get { return _readBufferSize; } + set { _readBufferSize = value; } + } + + /// <summary> + /// Initialize a new instance + /// </summary> + /// <param name="stream">Underlying network stream</param> + /// <param name="protocolListener">Protocol listener to report exceptions to</param> + public IoHandler(Stream stream, IProtocolListener protocolListener) + { + if ( stream == null ) + throw new ArgumentNullException("stream"); + if ( protocolListener == null ) + throw new ArgumentNullException("protocolListener"); + + // initially, the stream at the top of the filter + // chain is the underlying network stream + _topStream = stream; + _protocolListener = protocolListener; + _readBufferSize = DEFAULT_BUFFER_SIZE; + } + + /// <summary> + /// Adds a new filter on the top of the chain + /// </summary> + /// <param name="filter">Stream filter to put on top of the chain</param> + /// <remarks> + /// This should *only* be called during initialization. We don't + /// support changing the filter change after the first read/write + /// has been done and it's not thread-safe to boot! + /// </remarks> + public void AddFilter(IStreamFilter filter) + { + _topStream = filter.CreateFilterStream(_topStream); + } + + #region IByteChannel Implementation + // + // IByteChannel Implementation + // + + /// <summary> + /// Read a <see cref="ByteBuffer"/> from the underlying + /// network stream and any configured filters + /// </summary> + /// <returns>A ByteBuffer, if available</returns> + public ByteBuffer Read() + { + byte[] bytes = AllocateBuffer(); + + int numOctets = _topStream.Read(bytes, 0, bytes.Length); + + return WrapByteArray(bytes, numOctets); + } + + /// <summary> + /// Begin an asynchronous read operation + /// </summary> + /// <param name="callback">Callback method to call when read operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + byte[] bytes = AllocateBuffer(); + ReadData rd = new ReadData(callback, state, bytes); + + // only put a callback if the caller wants one. + AsyncCallback myCallback = null; + if ( callback != null ) + myCallback = new AsyncCallback(OnAsyncReadDone); + + IAsyncResult result = _topStream.BeginRead( + bytes, 0, bytes.Length, myCallback,rd + ); + return new WrappedAsyncResult(result, bytes); + } + + /// <summary> + /// End an asynchronous read operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> + /// <returns>The <see cref="ByteBuffer"/> read</returns> + public ByteBuffer EndRead(IAsyncResult result) + { + WrappedAsyncResult theResult = (WrappedAsyncResult)result; + int bytesRead = _topStream.EndRead(theResult.InnerResult); + return WrapByteArray(theResult.Buffer, bytesRead); + } + + /// <summary> + /// Write a <see cref="ByteBuffer"/> to the underlying network + /// stream, going through any configured filters + /// </summary> + /// <param name="buffer"></param> + public void Write(ByteBuffer buffer) + { + try + { + _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME + } + catch (Exception e) + { + _log.Warn("Write caused exception", e); + _protocolListener.OnException(e); + } + } + + /// <summary> + /// Begin an asynchronous write operation + /// </summary> + /// <param name="buffer">Buffer to write</param> + /// <param name="callback">A callback to call when the operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) + { + try + { + return _topStream.BeginWrite( + buffer.Array, buffer.Position, buffer.Limit, + callback, state + ); + } catch ( Exception e ) + { + _log.Error("BeginWrite caused exception", e); + // not clear if an exception here should be propagated? we still + // need to propagate it upwards anyway! + _protocolListener.OnException(e); + throw; + } + } + + /// <summary> + /// End an asynchronous write operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> + public void EndWrite(IAsyncResult result) + { + try + { + _topStream.EndWrite(result); + } catch ( Exception e ) + { + _log.Error("EndWrite caused exception", e); + // not clear if an exception here should be propagated? + _protocolListener.OnException(e); + //throw; + } + } + #endregion // IByteChannel Implementation + + #region IDisposable Implementation + // + // IDisposable Implementation + // + + public void Dispose() + { + if ( _topStream != null ) + { + _topStream.Close(); + } + } + + #endregion // IDisposable Implementation + + #region Private and Helper Classes/Methods + // + // Private and Helper Classes/Methods + // + + private byte[] AllocateBuffer() + { + return new byte[ReadBufferSize]; + } + + private static ByteBuffer WrapByteArray(byte[] bytes, int size) + { + ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes); + byteBuffer.Limit = size; + byteBuffer.Flip(); + + return byteBuffer; + } + + + private static void OnAsyncReadDone(IAsyncResult result) + { + ReadData rd = (ReadData) result.AsyncState; + IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer); + rd.Callback(wrapped); + } + + class ReadData + { + private object _state; + private AsyncCallback _callback; + private byte[] _buffer; + + public object State + { + get { return _state; } + } + + public AsyncCallback Callback + { + get { return _callback; } + } + + public byte[] Buffer + { + get { return _buffer; } + } + + public ReadData(AsyncCallback callback, object state, byte[] buffer) + { + _callback = callback; + _state = state; + _buffer = buffer; + } + } + + class WrappedAsyncResult : IAsyncResult + { + private IAsyncResult _innerResult; + private byte[] _buffer; + + #region IAsyncResult Properties + // + // IAsyncResult Properties + // + public bool IsCompleted + { + get { return _innerResult.IsCompleted; } + } + + public WaitHandle AsyncWaitHandle + { + get { return _innerResult.AsyncWaitHandle; } + } + + public object AsyncState + { + get { return _innerResult.AsyncState; } + } + + public bool CompletedSynchronously + { + get { return _innerResult.CompletedSynchronously; } + } + #endregion // IAsyncResult Properties + + public IAsyncResult InnerResult + { + get { return _innerResult; } + } + public byte[] Buffer + { + get { return _buffer; } + } + + public WrappedAsyncResult(IAsyncResult result, byte[] buffer) + { + if ( result == null ) + throw new ArgumentNullException("result"); + if ( buffer == null ) + throw new ArgumentNullException("buffer"); + + _innerResult = result; + _buffer = buffer; + } + } + + #endregion // Private and Helper Classes/Methods + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs index 3841c158e4..9fa313152f 100644 --- a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs +++ b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs @@ -1,60 +1,60 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Codec;
-using Apache.Qpid.Framing;
-using log4net;
-
-namespace Apache.Qpid.Client.Transport
-{
- /// <summary>
- /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
- /// each <see cref="IDataBlock"/> as it is decoded to the
- /// protocol listener
- /// </summary>
- internal class ProtocolDecoderOutput : IProtocolDecoderOutput
- {
- private IProtocolListener _protocolListener;
- static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
-
- public ProtocolDecoderOutput(IProtocolListener protocolListener)
- {
- if ( protocolListener == null )
- throw new ArgumentNullException("protocolListener");
-
- _protocolListener = protocolListener;
- }
-
- public void Write(object message)
- {
- IDataBlock block = message as IDataBlock;
- if ( block != null )
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", block));
- _protocolListener.OnMessage(block);
- }
- }
- }
-} // namespace Apache.Qpid.Client.Transport
-
-
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Codec; +using Apache.Qpid.Framing; +using log4net; + +namespace Apache.Qpid.Client.Transport +{ + /// <summary> + /// <see cref="IProtocolDecoderOutput"/> implementation that forwards + /// each <see cref="IDataBlock"/> as it is decoded to the + /// protocol listener + /// </summary> + internal class ProtocolDecoderOutput : IProtocolDecoderOutput + { + private IProtocolListener _protocolListener; + static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); + + public ProtocolDecoderOutput(IProtocolListener protocolListener) + { + if ( protocolListener == null ) + throw new ArgumentNullException("protocolListener"); + + _protocolListener = protocolListener; + } + + public void Write(object message) + { + IDataBlock block = message as IDataBlock; + if ( block != null ) + { + _protocolTraceLog.Debug(String.Format("READ {0}", block)); + _protocolListener.OnMessage(block); + } + } + } +} // namespace Apache.Qpid.Client.Transport + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index 8a16f9a675..f336d8a80a 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -1,150 +1,150 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.IO;
-using System.Threading;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Codec;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Transport.Socket.Blocking
-{
- /// <summary>
- /// TCP Socket transport supporting both
- /// SSL and non-SSL connections.
- /// </summary>
- public class BlockingSocketTransport : ITransport
- {
- // Configuration variables.
- IProtocolListener _protocolListener;
-
- // Runtime variables.
- private ISocketConnector _connector;
- private IoHandler _ioHandler;
- private AmqpChannel _amqpChannel;
- private ManualResetEvent _stopEvent;
-
- public IProtocolWriter ProtocolWriter
- {
- get { return _amqpChannel; }
- }
- public string LocalEndpoint
- {
- get { return _connector.LocalEndpoint; }
- }
-
-
- /// <summary>
- /// Connect to the specified broker
- /// </summary>
- /// <param name="broker">The broker to connect to</param>
- /// <param name="connection">The AMQ connection</param>
- public void Connect(IBrokerInfo broker, AMQConnection connection)
- {
- _stopEvent = new ManualResetEvent(false);
- _protocolListener = connection.ProtocolListener;
-
- _ioHandler = MakeBrokerConnection(broker, connection);
- // todo: get default read size from config!
-
- IProtocolDecoderOutput decoderOutput =
- new ProtocolDecoderOutput(_protocolListener);
- _amqpChannel =
- new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
-
- // post an initial async read
- _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
- }
-
- /// <summary>
- /// Close the broker connection
- /// </summary>
- public void Close()
- {
- StopReading();
- CloseBrokerConnection();
- }
-
- private void StopReading()
- {
- _stopEvent.Set();
- }
-
- private void CloseBrokerConnection()
- {
- if ( _ioHandler != null )
- {
- _ioHandler.Dispose();
- _ioHandler = null;
- }
- if ( _connector != null )
- {
- _connector.Dispose();
- _connector = null;
- }
- }
-
- private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
- {
- if ( broker.UseSSL )
- {
- _connector = new SslSocketConnector();
- } else
- {
- _connector = new SocketConnector();
- }
-
- Stream stream = _connector.Connect(broker);
- return new IoHandler(stream, connection.ProtocolListener);
- }
-
- private void OnAsyncReadDone(IAsyncResult result)
- {
- try
- {
- _amqpChannel.EndRead(result);
-
- bool stopping = _stopEvent.WaitOne(0, false);
- if ( !stopping )
- _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
- } catch ( Exception e )
- {
- // ignore any errors during closing
- bool stopping = _stopEvent.WaitOne(0, false);
- if ( !stopping )
- _protocolListener.OnException(e);
- }
- }
-
- #region IProtocolDecoderOutput Members
-
- public void Write(object message)
- {
- _protocolListener.OnMessage((IDataBlock)message);
- }
-
- #endregion
- }
-}
-
-
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.IO; +using System.Threading; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Codec; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// <summary> + /// TCP Socket transport supporting both + /// SSL and non-SSL connections. + /// </summary> + public class BlockingSocketTransport : ITransport + { + // Configuration variables. + IProtocolListener _protocolListener; + + // Runtime variables. + private ISocketConnector _connector; + private IoHandler _ioHandler; + private AmqpChannel _amqpChannel; + private ManualResetEvent _stopEvent; + + public IProtocolWriter ProtocolWriter + { + get { return _amqpChannel; } + } + public string LocalEndpoint + { + get { return _connector.LocalEndpoint; } + } + + + /// <summary> + /// Connect to the specified broker + /// </summary> + /// <param name="broker">The broker to connect to</param> + /// <param name="connection">The AMQ connection</param> + public void Connect(IBrokerInfo broker, AMQConnection connection) + { + _stopEvent = new ManualResetEvent(false); + _protocolListener = connection.ProtocolListener; + + _ioHandler = MakeBrokerConnection(broker, connection); + // todo: get default read size from config! + + IProtocolDecoderOutput decoderOutput = + new ProtocolDecoderOutput(_protocolListener); + _amqpChannel = + new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput); + + // post an initial async read + _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this); + } + + /// <summary> + /// Close the broker connection + /// </summary> + public void Close() + { + StopReading(); + CloseBrokerConnection(); + } + + private void StopReading() + { + _stopEvent.Set(); + } + + private void CloseBrokerConnection() + { + if ( _ioHandler != null ) + { + _ioHandler.Dispose(); + _ioHandler = null; + } + if ( _connector != null ) + { + _connector.Dispose(); + _connector = null; + } + } + + private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection) + { + if ( broker.UseSSL ) + { + _connector = new SslSocketConnector(); + } else + { + _connector = new SocketConnector(); + } + + Stream stream = _connector.Connect(broker); + return new IoHandler(stream, connection.ProtocolListener); + } + + private void OnAsyncReadDone(IAsyncResult result) + { + try + { + _amqpChannel.EndRead(result); + + bool stopping = _stopEvent.WaitOne(0, false); + if ( !stopping ) + _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null); + } catch ( Exception e ) + { + // ignore any errors during closing + bool stopping = _stopEvent.WaitOne(0, false); + if ( !stopping ) + _protocolListener.OnException(e); + } + } + + #region IProtocolDecoderOutput Members + + public void Write(object message) + { + _protocolListener.OnMessage((IDataBlock)message); + } + + #endregion + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs index 73575c7086..4540f01f4e 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs @@ -1,92 +1,92 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using log4net;
-using Apache.Qpid.Buffer;
-
-namespace Apache.Qpid.Client.Transport.Socket.Blocking
-{
- class ByteChannel : IByteChannel
- {
- // Warning: don't use this log for regular logging.
- private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel");
-
- private IByteChannel _lowerChannel;
-
- public ByteChannel(IByteChannel lowerChannel)
- {
- _lowerChannel = lowerChannel;
- }
-
- public ByteBuffer Read()
- {
- ByteBuffer result = _lowerChannel.Read();
-
- // TODO: Move into decorator.
- if (_ioTraceLog.IsDebugEnabled)
- {
- _ioTraceLog.Debug(String.Format("READ {0}", result));
- }
-
- return result;
- }
-
- public IAsyncResult BeginRead(AsyncCallback callback, object state)
- {
- return _lowerChannel.BeginRead(callback, state);
- }
-
- public ByteBuffer EndRead(IAsyncResult result)
- {
- ByteBuffer buffer = _lowerChannel.EndRead(result);
- if ( _ioTraceLog.IsDebugEnabled )
- {
- _ioTraceLog.Debug(String.Format("READ {0}", buffer));
- }
- return buffer;
- }
-
- public void Write(ByteBuffer buffer)
- {
- // TODO: Move into decorator.
- if (_ioTraceLog.IsDebugEnabled)
- {
- _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
- }
-
- _lowerChannel.Write(buffer);
- }
-
- public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
- {
- if ( _ioTraceLog.IsDebugEnabled )
- {
- _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
- }
- return _lowerChannel.BeginWrite(buffer, callback, state);
- }
-
- public void EndWrite(IAsyncResult result)
- {
- _lowerChannel.EndWrite(result);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + class ByteChannel : IByteChannel + { + // Warning: don't use this log for regular logging. + private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel"); + + private IByteChannel _lowerChannel; + + public ByteChannel(IByteChannel lowerChannel) + { + _lowerChannel = lowerChannel; + } + + public ByteBuffer Read() + { + ByteBuffer result = _lowerChannel.Read(); + + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("READ {0}", result)); + } + + return result; + } + + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + return _lowerChannel.BeginRead(callback, state); + } + + public ByteBuffer EndRead(IAsyncResult result) + { + ByteBuffer buffer = _lowerChannel.EndRead(result); + if ( _ioTraceLog.IsDebugEnabled ) + { + _ioTraceLog.Debug(String.Format("READ {0}", buffer)); + } + return buffer; + } + + public void Write(ByteBuffer buffer) + { + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); + } + + _lowerChannel.Write(buffer); + } + + public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) + { + if ( _ioTraceLog.IsDebugEnabled ) + { + _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); + } + return _lowerChannel.BeginWrite(buffer, callback, state); + } + + public void EndWrite(IAsyncResult result) + { + _lowerChannel.EndWrite(result); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs index 3d5d2898cf..137fa19c0d 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs @@ -1,34 +1,34 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.IO;
-using Apache.Qpid.Client.Qms;
-
-namespace Apache.Qpid.Client.Transport.Socket.Blocking
-{
- interface ISocketConnector : IDisposable
- {
- string LocalEndpoint { get; }
- Stream Connect(IBrokerInfo broker);
- }
-}
-
-
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + interface ISocketConnector : IDisposable + { + string LocalEndpoint { get; } + Stream Connect(IBrokerInfo broker); + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs index 83f7287e9b..b6dd8c3be1 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs @@ -1,71 +1,71 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using Apache.Qpid.Client.Qms;
-
-namespace Apache.Qpid.Client.Transport.Socket.Blocking
-{
- /// <summary>
- /// Implements a TCP connection over regular sockets.
- /// </summary>
- class SocketConnector : ISocketConnector
- {
- private MyTcpClient _tcpClient;
-
- public string LocalEndpoint
- {
- get { return _tcpClient.LocalEndpoint.ToString(); }
- }
-
- public Stream Connect(IBrokerInfo broker)
- {
- _tcpClient = new MyTcpClient(broker.Host, broker.Port);
- return _tcpClient.GetStream();
- }
-
- public void Dispose()
- {
- if ( _tcpClient != null )
- {
- _tcpClient.Close();
- _tcpClient = null;
- }
- }
-
- class MyTcpClient : TcpClient
- {
- public MyTcpClient(string host, int port)
- : base(host, port)
- {
- }
-
- public EndPoint LocalEndpoint
- {
- get { return Client.LocalEndPoint; }
- }
- }
-
- }
-}
-
-
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; +using System.Net; +using System.Net.Sockets; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// <summary> + /// Implements a TCP connection over regular sockets. + /// </summary> + class SocketConnector : ISocketConnector + { + private MyTcpClient _tcpClient; + + public string LocalEndpoint + { + get { return _tcpClient.LocalEndpoint.ToString(); } + } + + public Stream Connect(IBrokerInfo broker) + { + _tcpClient = new MyTcpClient(broker.Host, broker.Port); + return _tcpClient.GetStream(); + } + + public void Dispose() + { + if ( _tcpClient != null ) + { + _tcpClient.Close(); + _tcpClient = null; + } + } + + class MyTcpClient : TcpClient + { + public MyTcpClient(string host, int port) + : base(host, port) + { + } + + public EndPoint LocalEndpoint + { + get { return Client.LocalEndPoint; } + } + } + + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs index 708edde48c..8436e6fc4f 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs @@ -1,107 +1,107 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System.IO;
-using System.Net;
-using log4net;
-using Apache.Qpid.Client.Qms;
-using Org.Mentalis.Security.Ssl;
-using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
-using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
-
-namespace Apache.Qpid.Client.Transport.Socket.Blocking
-{
- /// <summary>
- /// Implements a TLS v1.0 connection using the Mentalis.org library
- /// </summary>
- /// <remarks>
- /// It would've been easier to implement this at the StreamFilter
- /// level, but unfortunately the Mentalis library doesn't support
- /// a passthrough SSL stream class and is tied directly
- /// to socket-like classes.
- /// </remarks>
- class SslSocketConnector : ISocketConnector
- {
- private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
- private MyTcpClient _tcpClient;
-
- public string LocalEndpoint
- {
- get { return _tcpClient.LocalEndpoint.ToString(); }
- }
-
- public Stream Connect(IBrokerInfo broker)
- {
- MCertificate cert = GetClientCert(broker);
- SecurityOptions options = new SecurityOptions(
- SecureProtocol.Tls1, cert, ConnectionEnd.Client
- );
- if ( broker.SslOptions != null
- && broker.SslOptions.IgnoreValidationErrors )
- {
- _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
- options.VerificationType = CredentialVerification.None;
- }
-
- _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
- return _tcpClient.GetStream();
- }
-
- public void Dispose()
- {
- if ( _tcpClient != null )
- {
- _tcpClient.Close();
- _tcpClient = null;
- }
- }
-
- private static MCertificate GetClientCert(IBrokerInfo broker)
- {
- // if a client certificate is configured,
- // use that to enable mutual authentication
- MCertificate cert = null;
- if ( broker.SslOptions != null
- && broker.SslOptions.ClientCertificate != null )
- {
- cert = MCertificate.CreateFromX509Certificate(
- broker.SslOptions.ClientCertificate
- );
- _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
- }
- return cert;
- }
-
- class MyTcpClient : SecureTcpClient
- {
- public MyTcpClient(string host, int port, SecurityOptions options)
- : base(host, port, options)
- {
- }
-
- public EndPoint LocalEndpoint
- {
- get { return Client.LocalEndPoint; }
- }
-
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; +using System.Net; +using log4net; +using Apache.Qpid.Client.Qms; +using Org.Mentalis.Security.Ssl; +using MCertificate = Org.Mentalis.Security.Certificates.Certificate; +using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// <summary> + /// Implements a TLS v1.0 connection using the Mentalis.org library + /// </summary> + /// <remarks> + /// It would've been easier to implement this at the StreamFilter + /// level, but unfortunately the Mentalis library doesn't support + /// a passthrough SSL stream class and is tied directly + /// to socket-like classes. + /// </remarks> + class SslSocketConnector : ISocketConnector + { + private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector)); + private MyTcpClient _tcpClient; + + public string LocalEndpoint + { + get { return _tcpClient.LocalEndpoint.ToString(); } + } + + public Stream Connect(IBrokerInfo broker) + { + MCertificate cert = GetClientCert(broker); + SecurityOptions options = new SecurityOptions( + SecureProtocol.Tls1, cert, ConnectionEnd.Client + ); + if ( broker.SslOptions != null + && broker.SslOptions.IgnoreValidationErrors ) + { + _logger.Warn("Ignoring any certificate validation errors during SSL handshake..."); + options.VerificationType = CredentialVerification.None; + } + + _tcpClient = new MyTcpClient(broker.Host, broker.Port, options); + return _tcpClient.GetStream(); + } + + public void Dispose() + { + if ( _tcpClient != null ) + { + _tcpClient.Close(); + _tcpClient = null; + } + } + + private static MCertificate GetClientCert(IBrokerInfo broker) + { + // if a client certificate is configured, + // use that to enable mutual authentication + MCertificate cert = null; + if ( broker.SslOptions != null + && broker.SslOptions.ClientCertificate != null ) + { + cert = MCertificate.CreateFromX509Certificate( + broker.SslOptions.ClientCertificate + ); + _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true)); + } + return cert; + } + + class MyTcpClient : SecureTcpClient + { + public MyTcpClient(string host, int port, SecurityOptions options) + : base(host, port, options) + { + } + + public EndPoint LocalEndpoint + { + get { return Client.LocalEndPoint; } + } + + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs index 87bb2a2859..a06de9eac8 100644 --- a/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs +++ b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs @@ -1,98 +1,98 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using System.Threading;
-using Apache.Qpid.Collections;
-using Apache.Qpid.Common;
-
-namespace Apache.Qpid.Client.Util
-{
- internal delegate void ThresholdMethod(int currentCount);
-
- /// <summary>
- /// Basic bounded queue used to implement prefetching.
- /// Notice we do the callbacks here asynchronously to
- /// avoid adding more complexity to the channel impl.
- /// </summary>
- internal class FlowControlQueue
- {
- private BlockingQueue _queue = new LinkedBlockingQueue();
- private int _itemCount;
- private int _lowerBound;
- private int _upperBound;
- private ThresholdMethod _underThreshold;
- private ThresholdMethod _overThreshold;
-
- public FlowControlQueue(
- int lowerBound,
- int upperBound,
- ThresholdMethod underThreshold,
- ThresholdMethod overThreshold
- )
- {
- _lowerBound = lowerBound;
- _upperBound = upperBound;
- _underThreshold = underThreshold;
- _overThreshold = overThreshold;
- }
-
- public void Enqueue(object item)
- {
- _queue.EnqueueBlocking(item);
- int count = Interlocked.Increment(ref _itemCount);
- if ( _overThreshold != null )
- {
- if ( count == _upperBound )
- {
- _overThreshold.BeginInvoke(
- count, new AsyncCallback(OnAsyncCallEnd),
- _overThreshold
- );
- }
- }
- }
-
- public object Dequeue()
- {
- object item = _queue.DequeueBlocking();
- int count = Interlocked.Decrement(ref _itemCount);
- if ( _underThreshold != null )
- {
- if ( count == _lowerBound )
- {
- _underThreshold.BeginInvoke(
- count, new AsyncCallback(OnAsyncCallEnd),
- _underThreshold
- );
- }
- }
- return item;
- }
-
- private void OnAsyncCallEnd(IAsyncResult res)
- {
- ThresholdMethod method = (ThresholdMethod)res.AsyncState;
- method.EndInvoke(res);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using System.Threading; +using Apache.Qpid.Collections; +using Apache.Qpid.Common; + +namespace Apache.Qpid.Client.Util +{ + internal delegate void ThresholdMethod(int currentCount); + + /// <summary> + /// Basic bounded queue used to implement prefetching. + /// Notice we do the callbacks here asynchronously to + /// avoid adding more complexity to the channel impl. + /// </summary> + internal class FlowControlQueue + { + private BlockingQueue _queue = new LinkedBlockingQueue(); + private int _itemCount; + private int _lowerBound; + private int _upperBound; + private ThresholdMethod _underThreshold; + private ThresholdMethod _overThreshold; + + public FlowControlQueue( + int lowerBound, + int upperBound, + ThresholdMethod underThreshold, + ThresholdMethod overThreshold + ) + { + _lowerBound = lowerBound; + _upperBound = upperBound; + _underThreshold = underThreshold; + _overThreshold = overThreshold; + } + + public void Enqueue(object item) + { + _queue.EnqueueBlocking(item); + int count = Interlocked.Increment(ref _itemCount); + if ( _overThreshold != null ) + { + if ( count == _upperBound ) + { + _overThreshold.BeginInvoke( + count, new AsyncCallback(OnAsyncCallEnd), + _overThreshold + ); + } + } + } + + public object Dequeue() + { + object item = _queue.DequeueBlocking(); + int count = Interlocked.Decrement(ref _itemCount); + if ( _underThreshold != null ) + { + if ( count == _lowerBound ) + { + _underThreshold.BeginInvoke( + count, new AsyncCallback(OnAsyncCallEnd), + _underThreshold + ); + } + } + return item; + } + + private void OnAsyncCallEnd(IAsyncResult res) + { + ThresholdMethod method = (ThresholdMethod)res.AsyncState; + method.EndInvoke(res); + } + } +} |