diff options
author | Aidan Skinner <aidan@apache.org> | 2008-04-23 23:56:38 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-04-23 23:56:38 +0000 |
commit | 5518fd899d97459bcd8c45b850da447697a60fe8 (patch) | |
tree | 30e500327125a70939e2ff4c9ac79f366ad7d0b1 /qpid/dotnet/Qpid.Client/Client | |
parent | 65971bf662ccc0df167b23ecb831f1ccb3d5e475 (diff) | |
download | qpid-python-5518fd899d97459bcd8c45b850da447697a60fe8.tar.gz |
QPID-832 sync from M2.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client')
80 files changed, 10716 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs b/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs new file mode 100644 index 0000000000..6382eaaf39 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs @@ -0,0 +1,39 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+
+namespace Apache.Qpid.Client
+{
+ [Serializable]
+ public class AMQAuthenticationException : AMQException
+ {
+ public AMQAuthenticationException(int error, String message)
+ : base(error, message)
+ {
+ }
+
+ protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs new file mode 100644 index 0000000000..d0bebf1170 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -0,0 +1,886 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.IO; +using System.Reflection; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.State; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Client.Transport.Socket.Blocking; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client +{ + public class AMQConnection : Closeable, IConnection + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection)); + + IConnectionInfo _connectionInfo; + private int _nextChannelId = 0; + + // _Connected should be refactored with a suitable wait object. + private bool _connected; + + Thread _heartBeatThread; + HeartBeatThread _heartBeatRunner; + + // The last error code that occured on the connection. Used to return the correct exception to the client + private AMQException _lastAMQException = null; + + /** + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. + * This must be held by any child objects of this connection such as the session, producers and consumers. + */ + private readonly Object _failoverMutex = new Object(); + public object FailoverMutex + { + get { return _failoverMutex; } + } + + /** + * Policy dictating how to failover + */ + private FailoverPolicy _failoverPolicy; + + internal bool IsFailoverAllowed + { + get { return _failoverPolicy.FailoverAllowed(); } + } + + /// <summary> + /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels + /// per session and we must prevent the client from opening too many. Zero means unlimited. + /// </summary> + private ushort _maximumChannelCount; + + /// <summary> + /// The maximum size of frame supported by the server + /// </summary> + private uint _maximumFrameSize; + + private AMQStateManager _stateManager; + + private AMQProtocolSession _protocolSession; + public AMQProtocolSession ProtocolSession { get { return _protocolSession; } } + + /// <summary> + /// Maps from session id (Integer) to AmqChannel instance + /// </summary> + private readonly IDictionary _sessions = new LinkedHashtable(); + + private ExceptionListenerDelegate _exceptionListener; + + private IConnectionListener _connectionListener; + + private ITransport _transport; + public ITransport Transport { get { return _transport; } } + + /// <summary> + /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for + /// message publication. + /// </summary> + private bool _started; + + private AMQProtocolListener _protocolListener; + public AMQProtocolListener ProtocolListener { get { return _protocolListener; } } + + public IProtocolWriter ProtocolWriter + { + get { return _transport.ProtocolWriter; } + } + + ProtocolWriter _protocolWriter; + + public ProtocolWriter ConvenientProtocolWriter + { + get { return _protocolWriter; } + } + + public AMQConnection(IConnectionInfo connectionInfo) + { + if (connectionInfo == null) + { + throw new ArgumentException("ConnectionInfo must be specified"); + } + _log.Debug("ConnectionInfo: " + connectionInfo); + _connectionInfo = connectionInfo; + _log.Debug("password = " + _connectionInfo.Password); + _failoverPolicy = new FailoverPolicy(connectionInfo); + + // We are not currently connected. + _connected = false; + + Exception lastException = null; + do + { + try + { + IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo(); + _log.Debug("Connecting to " + brokerInfo); + MakeBrokerConnection(brokerInfo); + break; + } + catch (Exception e) + { + lastException = e; + _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); + // XXX: Should perhaps break out of the do/while here if not a SocketException... + } + } while (_failoverPolicy.FailoverAllowed()); + + _log.Debug("Are we connected:" + _connected); + + if (!_failoverPolicy.FailoverAllowed()) + { + if ( lastException is AMQException ) + throw lastException; + else + throw new AMQConnectionException("Unable to connect", lastException); + } + + // TODO: this needs to be redone so that we are not spinning. + // A suitable object should be set that is then waited on + // and only notified when a connection is made or when + // the AMQConnection gets closed. + while (!_connected && !Closed) + { + _log.Debug("Sleeping."); + Thread.Sleep(100); + } + if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null) + { + if (_lastAMQException != null) + { + throw _lastAMQException; + } + } + } + + /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) + { + //Assembly assembly = Assembly.LoadFrom(assemblyName); + Assembly assembly = Assembly.Load(assemblyName); + + foreach (Type type in assembly.GetTypes()) + { + _log.Debug(String.Format("type = {0}", type)); + } + + Type transport = assembly.GetType(transportType); + + if (transport == null) + { + throw new ArgumentException( + String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName)); + + } + + _log.Debug("transport = " + transport); + _log.Debug("ctors = " + transport.GetConstructors()); + + ConstructorInfo info = transport.GetConstructors()[0]; + ITransport result = (ITransport)info.Invoke(new object[] { host, port, this }); + + _log.Debug("transport = " + result); + + return result; + }*/ + + public void Disconnect() + { + _transport.Close(); + } + + #region IConnection Members + + public string ClientID + { + get + { + CheckNotClosed(); + return _connectionInfo.ClientName; + } + set + { + CheckNotClosed(); + _connectionInfo.ClientName = value; + } + } + + public override void Close() + { + lock (FailoverMutex) + { + // atomically set to closed and check the _previous value was NOT CLOSED + if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED) + { + try + { + CloseAllSessions(null); + CloseConnection(); + } + catch (AMQException e) + { + throw new QpidException("Error closing connection: " + e); + } + } + } + } + + private void CloseConnection() + { + _stateManager.ChangeState(AMQState.CONNECTION_CLOSING); + + AMQFrame frame = ConnectionCloseBody.CreateAMQFrame( + 0, 200, "Qpid.NET client is closing the connection.", 0, 0); + + ProtocolWriter.Write(frame); + + _log.Debug("Blocking for connection close ok frame"); + + _stateManager.AttainState(AMQState.CONNECTION_CLOSED); + Disconnect(); + } + + class CreateChannelFailoverSupport : FailoverSupport + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport)); + + private bool _transacted; + private AcknowledgeMode _acknowledgeMode; + int _prefetchHigh; + int _prefetchLow; + AMQConnection _connection; + + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { + _connection = connection; + _transacted = transacted; + _acknowledgeMode = acknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + } + + protected override object operation() + { + ushort channelId = _connection.NextChannelId(); + + if (_log.IsDebugEnabled) + { + _log.Debug("Write channel open frame for channel id " + channelId); + } + + // We must create the channel and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AmqChannel channel = new AmqChannel(_connection, + channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); + _connection.ProtocolSession.AddSessionByChannel(channelId, channel); + _connection.RegisterSession(channelId, channel); + + bool success = false; + try + { + _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); + success = true; + } + catch (AMQException e) + { + throw new QpidException("Error creating channel: " + e, e); + } + finally + { + if (!success) { + _connection.ProtocolSession.RemoveSessionByChannel(channelId); + _connection.DeregisterSession(channelId); + } + } + + if (_connection._started) + { + channel.Start(); + } + return channel; + } + } + + internal ushort NextChannelId() + { + return (ushort) Interlocked.Increment(ref _nextChannelId); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) + { + return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) + { + return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { + CheckNotClosed(); + if (ChannelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } + else + { + CreateChannelFailoverSupport operation = + new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); + return (IChannel)operation.execute(this); + } + } + + public void CloseSession(AmqChannel channel) + { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). + _protocolSession.CloseSession(channel); + + AMQFrame frame = ChannelCloseBody.CreateAMQFrame( + channel.ChannelId, 200, "JMS client closing channel", 0, 0); + + _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId); + _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody)); + _log.Debug("Received channel close frame"); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully + } + + public ExceptionListenerDelegate ExceptionListener + { + get + { + CheckNotClosed(); + return _exceptionListener; + } + set + { + CheckNotClosed(); + _exceptionListener = value; + } + } + + /// <summary> + /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread + /// and is not thread safe (which is legal according to the JMS specification). + /// @throws JMSException + /// </summary> + public void Start() + { + CheckNotClosed(); + + if (!_started) + { + foreach (DictionaryEntry lde in _sessions) + { + AmqChannel s = (AmqChannel)lde.Value; + s.Start(); + } + _started = true; + } + } + + public void Stop() + { + CheckNotClosed(); + + if (_started) + { + foreach (DictionaryEntry lde in _sessions) + { + AmqChannel s = (AmqChannel) lde.Value; + s.Stop(); + } + _started = false; + } + } + + public IConnectionListener ConnectionListener + { + get { return _connectionListener; } + set { _connectionListener = value; } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + Close(); + } + + #endregion + + private bool ChannelLimitReached() + { + return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount; + } + + /// <summary> + /// Close all the sessions, either due to normal connection closure or due to an error occurring. + /// @param cause if not null, the error that is causing this shutdown + /// </summary> + private void CloseAllSessions(Exception cause) + { + _log.Debug("Closing all session in connection " + this); + ICollection sessions = new ArrayList(_sessions.Values); + foreach (AmqChannel channel in sessions) + { + _log.Debug("Closing channel " + channel); + if (cause != null) + { + channel.ClosedWithException(cause); + } + else + { + try + { + channel.Close(); + } + catch (QpidException e) + { + _log.Error("Error closing channel: " + e); + } + } + } + _log.Debug("Done closing all sessions in connection " + this); + } + + public int MaximumChannelCount + { + get + { + CheckNotClosed(); + return _maximumChannelCount; + } + } + + internal void SetMaximumChannelCount(ushort maximumChannelCount) + { + CheckNotClosed(); + _maximumChannelCount = maximumChannelCount; + } + + public uint MaximumFrameSize + { + get + { + return _maximumFrameSize; + } + + set + { + _maximumFrameSize = value; + } + } + + public IDictionary Sessions + { + get + { + return _sessions; + } + } + + public string Host + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().Host; + } + } + + public int Port + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().Port; + } + } + + public string Username + { + get + { + return _connectionInfo.Username; + } + } + + public string Password + { + get + { + return _connectionInfo.Password; + } + } + + public string VirtualHost + { + get + { + return _connectionInfo.VirtualHost; + } + } + + /// <summary> + /// Invoked by the AMQProtocolSession when a protocol session exception has occurred. + /// This method sends the exception to a JMS exception listener, if configured, and + /// propagates the exception to sessions, which in turn will propagate to consumers. + /// This allows synchronous consumers to have exceptions thrown to them. + /// </summary> + /// <param name="cause">the exception</param> + public void ExceptionReceived(Exception cause) + { + if (_exceptionListener != null) + { + // Listener expects one of these... + QpidException xe; + + if (cause is QpidException) + { + xe = (QpidException) cause; + } + else + { + xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause); + } + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true + // so that any generic client code that tries to close the connection will not mess up this error + // handling sequence + if (cause is IOException) + { + Interlocked.Exchange(ref _closed, CLOSED); + } +#if __MonoCS__ + _exceptionListener(xe); +#else + _exceptionListener.Invoke(xe); +#endif + } + else + { + _log.Error("Connection exception: " + cause); + } + + // An undelivered is not fatal to the connections usability. + if (!(cause is AMQUndeliveredException)) + { + Interlocked.Exchange(ref _closed, CLOSED); + CloseAllSessions(cause); + } + else + { + ; + } + } + + internal void RegisterSession(int channelId, AmqChannel channel) + { + _sessions[channelId] = channel; + } + + internal void DeregisterSession(int channelId) + { + _sessions.Remove(channelId); + } + + /** + * Fire the preFailover event to the registered connection listener (if any) + * + * @param redirect true if this is the result of a redirect request rather than a connection error + * @return true if no listener or listener does not veto change + */ + public bool FirePreFailover(bool redirect) + { + bool proceed = true; + if (_connectionListener != null) + { + proceed = _connectionListener.PreFailover(redirect); + } + return proceed; + } + + /** + * Fire the preResubscribe event to the registered connection listener (if any). If the listener + * vetoes resubscription then all the sessions are closed. + * + * @return true if no listener or listener does not veto resubscription. + * @throws JMSException + */ + public bool FirePreResubscribe() + { + if (_connectionListener != null) + { + bool resubscribe = _connectionListener.PreResubscribe(); + if (!resubscribe) + { + MarkAllSessionsClosed(); + } + return resubscribe; + } + else + { + return true; + } + } + + /** + * Marks all sessions and their children as closed without sending any protocol messages. Useful when + * you need to mark objects "visible" in userland as closed after failover or other significant event that + * impacts the connection. + * <p/> + * The caller must hold the failover mutex before calling this method. + */ + private void MarkAllSessionsClosed() + { + //LinkedList sessionCopy = new LinkedList(_sessions.values()); + ArrayList sessionCopy = new ArrayList(_sessions.Values); + foreach (AmqChannel session in sessionCopy) + { + session.MarkClosed(); + } + _sessions.Clear(); + } + + /** + * Fires a failover complete event to the registered connection listener (if any). + */ + public void FireFailoverComplete() + { + if (_connectionListener != null) + { + _connectionListener.FailoverComplete(); + } + } + + public bool AttemptReconnection(String host, int port, SslOptions sslConfig) + { + IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); + + _failoverPolicy.setBroker(bd); + + try + { + MakeBrokerConnection(bd); + return true; + } + catch (Exception e) + { + _log.Debug("Unable to connect to broker at " + bd, e); + AttemptReconnection(); + } + return false; + } + + private void MakeBrokerConnection(IBrokerInfo brokerDetail) + { + try + { + _stateManager = new AMQStateManager(); + _protocolListener = new AMQProtocolListener(this, _stateManager); + _protocolListener.AddFrameListener(_stateManager); + + /* + // Currently there is only one transport option - BlockingSocket. + String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll"; + String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; + + // Load the transport assembly dynamically. + _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); + */ + + _transport = new BlockingSocketTransport(); + + // Connect. + _transport.Connect(brokerDetail, this); + _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); + _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); + _protocolListener.ProtocolSession = _protocolSession; + + // Now start the connection "handshake". + _transport.ProtocolWriter.Write(new ProtocolInitiation()); + + // Blocks until the connection has been opened. + _stateManager.AttainState(AMQState.CONNECTION_OPEN); + + _failoverPolicy.attainedConnection(); + + // XXX: Again this should be changed to a suitable notify. + _connected = true; + } + catch (AMQException e) + { + _lastAMQException = e; + throw; // rethrow + } + } + + public bool AttemptReconnection() + { + while (_failoverPolicy.FailoverAllowed()) + { + try + { + MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo()); + return true; + } + catch (Exception e) + { + if (!(e is AMQException)) + { + _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e); + } + else + { + _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo()); + } + } + } + + // Connection unsuccessful. + return false; + } + + /** + * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling. + * The caller must hold the failover mutex before calling this method. + */ + public void ResubscribeChannels() + { + ArrayList channels = new ArrayList(_sessions.Values); + _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count)); + foreach (AmqChannel channel in channels) + { + _protocolSession.AddSessionByChannel(channel.ChannelId, channel); + ReopenChannel( + channel.ChannelId, + channel.DefaultPrefetchHigh, + channel.DefaultPrefetchLow, + channel.Transacted + ); + channel.ReplayOnFailOver(); + } + } + + private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) + { + _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", + channelId, prefetchHigh, prefetchLow, transacted)); + try + { + CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + } + catch (AMQException e) + { + _protocolSession.RemoveSessionByChannel(channelId); + DeregisterSession(channelId); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + } + } + + void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) + { + _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); + + // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We + // know this when we connection using AMQP 0.7 + if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7) + { + // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. + _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody)); + } + + if (transacted) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Issuing TxSelect for " + channelId); + } + _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody)); + } + } + + public String toURL() + { + return _connectionInfo.AsUrl(); + } + + class HeartBeatThread + { + int _heartbeatMillis; + IProtocolWriter _protocolWriter; + bool _run = true; + + public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis) + { + _protocolWriter = protocolWriter; + _heartbeatMillis = heartbeatMillis; + } + + public void Run() + { + while (_run) + { + Thread.Sleep(_heartbeatMillis); + if (!_run) break; + _log.Debug("Sending heartbeat"); + // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker? + _protocolWriter.Write(HeartbeatBody.FRAME); + } + _log.Debug("Heatbeat thread stopped"); + } + + public void Stop() + { + _run = false; + } + } + + public void StartHeartBeatThread(int heartbeatSeconds) + { + _log.Debug("Starting new heartbeat thread"); + _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000); + _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run)); + _heartBeatThread.Name = "HeartBeat"; + _heartBeatThread.Start(); + } + + public void StopHeartBeatThread() + { + if (_heartBeatRunner != null) + { + _log.Debug("Stopping old heartbeat thread"); + _heartBeatRunner.Stop(); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs new file mode 100644 index 0000000000..c8a48814bb --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQConnectionException : AMQException + { + public AMQConnectionException(String message, Exception e) : base(message, e) + { + } + + protected AMQConnectionException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs b/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs new file mode 100644 index 0000000000..07ce3c2354 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Apache.Qpid.Client +{ + public abstract class AMQDestination + { + protected readonly string _exchangeName; + protected readonly string _exchangeClass; + protected readonly string _destinationName; + protected readonly bool _isExclusive; + protected readonly bool _isAutoDelete; + protected bool _isDurable; + + public bool IsDurable + { + + get { return _isDurable; } + } + + protected string _queueName; + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, bool isExclusive, + bool isAutoDelete, String queueName) + { + // XXX: This is ugly - OnlyRequired because of ReplyToDestination. +// if (destinationName == null) +// { +// throw new ArgumentNullException("destinationName"); +// } + + // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter. +// if (exchangeName == null) +// { +// throw new ArgumentNullException("exchangeName"); +// } + + // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter. +// if (exchangeClass == null) +// { +// throw new ArgumentNullException("exchangeClass"); +// } + + _exchangeName = exchangeName; + _exchangeClass = exchangeClass; + _destinationName = destinationName; + _isExclusive = isExclusive; + _isAutoDelete = isAutoDelete; + _queueName = queueName; + } + + public string Name + { + get + { + return _destinationName; + } + } + + public abstract string RoutingKey + { + get; + } + + public abstract string EncodedName + { + get; + } + + public bool AutoDelete + { + get + { + return _isAutoDelete; + } + } + + public string QueueName + { + get + { + return _queueName; + } + set + { + _queueName = value; + } + } + + public string ExchangeName + { + get + { + return _exchangeName; + } + } + + public string ExchangeClass + { + get + { + return _exchangeClass; + } + } + + public bool IsExclusive + { + get + { + return _isExclusive; + } + } + + public bool IsAutoDelete + { + get + { + return _isAutoDelete; + } + } + + public override string ToString() + { + return "Destination: " + _destinationName + ", " + + "Queue Name: " + _queueName + ", Exchange: " + _exchangeName + + ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive + + ", AutoDelete: " + _isAutoDelete; // +", Routing Key: " + RoutingKey; + } + + public override bool Equals(object o) + { + if (this == o) + { + return true; + } + if (o == null || GetType() != o.GetType()) + { + return false; + } + + AMQDestination that = (AMQDestination) o; + + if (!StringsNotEqualNullSafe(_destinationName, that._destinationName)) + { + return false; + } + if (!StringsNotEqualNullSafe(_exchangeClass, that._exchangeClass)) + { + return false; + } + if (!StringsNotEqualNullSafe(_exchangeName, that._exchangeName)) + { + return false; + } + if (!StringsNotEqualNullSafe(_queueName, that._queueName)) + { + return false; + } + if (_isExclusive != that._isExclusive) + { + return false; + } + if (_isAutoDelete != that._isAutoDelete) + { + return false; + } + return true; + } + + private bool StringsNotEqualNullSafe(string one, string two) + { + if ((one == null && two != null) || + (one != null && !one.Equals(two))) + { + return false; + } + else + { + return true; + } + } + + public override int GetHashCode() + { + int result; + if (_exchangeName == null) + { + result = "".GetHashCode(); + } + else + { + result = _exchangeName.GetHashCode(); + } + if (_exchangeClass != null) + { + result = 29 * result + _exchangeClass.GetHashCode(); + } + if (_destinationName != null) + { + result = 29 * result + _destinationName.GetHashCode(); + } + if (_queueName != null) + { + result = 29 * result + _queueName.GetHashCode(); + } + result = result * (_isExclusive ? 13 : 7); + result = result * (_isAutoDelete ? 13 : 7); + + Console.WriteLine("FIXME HashCode for " + this + " = " + result); + return result; + } + + public abstract bool IsNameRequired { get; } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs new file mode 100644 index 0000000000..0d93176734 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs @@ -0,0 +1,45 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.Qpid.Common;
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid.Client
+{
+ [Serializable]
+ public class AMQNoConsumersException : AMQUndeliveredException
+ {
+ public AMQNoConsumersException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoConsumersException(string message, object bounced)
+ : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
+ {
+ }
+ protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs new file mode 100644 index 0000000000..bde3cdd989 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.Qpid.Common;
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid.Client
+{
+ [Serializable]
+ public class AMQNoRouteException : AMQUndeliveredException
+ {
+ public AMQNoRouteException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoRouteException(string message, object bounced)
+ : base(AMQConstant.NO_ROUTE.Code, message, bounced)
+ {
+ }
+
+ protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs new file mode 100644 index 0000000000..591c5b941f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client +{ + public class AmqBrokerInfo : IBrokerInfo + { + public readonly string URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\""+BrokerInfoConstants.DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]"; + + public const long DEFAULT_CONNECT_TIMEOUT = 30000L; + + private string _host = "localhost"; + private int _port = 5672; + private string _transport = "amqp"; + private Hashtable _options = new Hashtable(); + private SslOptions _sslOptions; + + public AmqBrokerInfo() + { + } + + public AmqBrokerInfo(string url) + { + // URL should be of format tcp://host:port?option='value',option='value' + try + { + Uri connection = new Uri(url); + + String transport = connection.Scheme; + + // Handles some defaults to minimise changes to existing broker URLS e.g. localhost + if (transport != null) + { + transport = transport.ToLower(); + //todo this list of valid transports should be enumerated somewhere + if ((!(transport.Equals("vm") || transport.Equals("tcp")))) + { + if (transport.Equals("localhost")) + { + connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url); + transport = connection.Scheme; + } + else + { + if (url[transport.Length] == ':' && url[transport.Length + 1] != '/') + { + //Then most likely we have a host:port value + connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url); + transport = connection.Scheme; + } + else + { + URLHelper.parseError(0, transport.Length, "Unknown transport", url); + } + } + } + } + else + { + //Default the transport + connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url); + transport = connection.Scheme; + } + + if (transport == null) + { + URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + + " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); + } + + Transport = transport; + + String host = connection.Host; + if (!host.Equals("default")) Host = host; + + int port = connection.Port; + + if (port == -1) + { + // Fix for when there is port data but it is not automatically parseable by getPort(). + String auth = connection.Authority; + + if (auth != null && auth.IndexOf(':') != -1) + { + int start = auth.IndexOf(":") + 1; + int end = start; + bool looking = true; + bool found = false; + //Walk the authority looking for a port value. + while (looking) + { + try + { + end++; + int.Parse(auth.Substring(start, end-start+1)); + + if (end >= auth.Length) + { + looking = false; + found = true; + } + } + catch (FormatException) + { + looking = false; + } + + } + if (found) + { + Port = int.Parse(auth.Substring(start, end-start+1)); + } + else + { + URLHelper.parseError(connection.ToString().IndexOf(connection.Authority) + end - 1, + "Illegal character in port number", connection.ToString()); + } + } + else + { + Port = BrokerInfoConstants.DEFAULT_PORT; + } + } + else + { + Port = port; + } + + String queryString = connection.Query; + if (queryString.Length > 0 && queryString[0] == '?') + { + queryString = queryString.Substring(1); + } + + URLHelper.parseOptions(_options, queryString); + + //Fragment is #string (not used) + } + catch (UriFormatException uris) + { + throw uris; +// if (uris is UrlSyntaxException) +// { +// throw uris; +// } +// +// URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + public AmqBrokerInfo(string transport, string host, int port, bool useSSL) : this() + { + _transport = transport; + _host = host; + _port = port; + + if (useSSL) + { + SetOption(BrokerInfoConstants.OPTIONS_SSL, "true"); + } + } + + public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig) + : this() + { + _transport = transport; + _host = host; + _port = port; + + if ( sslConfig != null ) + { + SetOption(BrokerInfoConstants.OPTIONS_SSL, "true"); + _sslOptions = sslConfig; + } + } + + + public string Host + { + get { return _host; } + set { _host = value; } + } + + public int Port + { + get { return _port; } + set { _port = value; } + } + + public string Transport + { + get { return _transport; } + set { _transport = value; } + } + + public SslOptions SslOptions + { + get { return _sslOptions; } + } + + public string GetOption(string key) + { + return (string)_options[key]; + } + + public void SetOption(string key, string value) + { + _options[key] = value; + } + + public long Timeout + { + get + { + if ( _options.ContainsKey(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT) ) + { + try + { + return long.Parse(GetOption(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT)); + } catch ( FormatException ) + { + //Do nothing as we will use the default below. + } + } + return BrokerInfoConstants.DEFAULT_CONNECT_TIMEOUT; + } + set + { + SetOption(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT, value.ToString()); + } + } + + public override string ToString() + { + StringBuilder sb = new StringBuilder(); + + sb.Append(_transport); + sb.Append("://"); + + if (!(StringEqualsIgnoreCase(_transport, "vm"))) + { + sb.Append(_host); + } + + sb.Append(':'); + sb.Append(_port); + + sb.Append(URLHelper.printOptions(_options)); + + return sb.ToString(); + } + + public override bool Equals(object obj) + { + if (!(obj is IBrokerInfo)) + { + return false; + } + + IBrokerInfo bd = (IBrokerInfo) obj; + return StringEqualsIgnoreCase(_host, bd.Host) && + _port == bd.Port && + StringEqualsIgnoreCase(_transport, bd.Transport) && + UseSSL == bd.UseSSL; + } + + public override int GetHashCode() + { + return _host.ToLower().GetHashCode() ^ _port.GetHashCode(); + } + + // TODO: move to util class. + private bool StringEqualsIgnoreCase(string one, string two) + { + return one.ToLower().Equals(two.ToLower()); + } + + public bool UseSSL + { + get + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if ( _options.ContainsKey(BrokerInfoConstants.OPTIONS_SSL) ) + { + return StringEqualsIgnoreCase(GetOption(BrokerInfoConstants.OPTIONS_SSL), "true"); + } + + return false; + } + set + { + SetOption(BrokerInfoConstants.OPTIONS_SSL, value.ToString()); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs new file mode 100644 index 0000000000..ce8e2ca2fe --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -0,0 +1,1190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text.RegularExpressions; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Util; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + /// <summary> + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Declare queues. + /// <tr><td> Declare exchanges. + /// <tr><td> Bind queues to exchanges. + /// <tr><td> Create messages. + /// <tr><td> Set up message consumers on the channel. + /// <tr><td> Set up message producers on the channel. + /// <tr><td> Commit the current transaction. + /// <tr><td> Roll-back the current transaction. + /// <tr><td> Close the channel. + /// </table> + /// </summary> + public class AmqChannel : Closeable, IChannel + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); + + internal const int BASIC_CONTENT_TYPE = 60; + + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; + + private static int _nextSessionNumber = 0; + + private AMQConnection _connection; + + private int _sessionNumber; + + private bool _suspended; + + private object _suspensionLock = new object(); + + // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. + private int _nextConsumerNumber = 1; + + private bool _transacted; + + private AcknowledgeMode _acknowledgeMode; + + private ushort _channelId; + + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + + private FlowControlQueue _queue; + + private Dispatcher _dispatcher; + + private MessageFactoryRegistry _messageFactoryRegistry; + + /// <summary> Holds all of the producers created by this channel. </summary> + private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); + + /// <summary> Holds all of the consumers created by this channel. </summary> + private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); + + private ArrayList _replayFrames = new ArrayList(); + + /// <summary> + /// The counter of the _next producer id. This id is generated by the session and used only to allow the + /// producer to identify itself to the session when deregistering itself. + /// + /// Access to this id does not require to be synchronized since according to the JMS specification only one + /// thread of control is allowed to create producers for any given session instance. + /// </summary> + private long _nextProducerId; + + /// <summary> + /// Initializes a new instance of the <see cref="AmqChannel"/> class. + /// </summary> + /// <param name="con">The connection.</param> + /// <param name="channelId">The channel id.</param> + /// <param name="transacted">if set to <c>true</c> [transacted].</param> + /// <param name="acknowledgeMode">The acknowledge mode.</param> + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, + int defaultPrefetchHigh, int defaultPrefetchLow) + : this() + { + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; + + if ( transacted ) + { + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } + else + { + _acknowledgeMode = acknowledgeMode; + } + + _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark)); + } + else + { + // low and upper are the same + _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null); + } + } + + private AmqChannel() + { + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } + + /// <summary> + /// Acknowledge mode for messages received. + /// </summary> + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; + } + } + + /// <summary> + /// True if the channel should use transactions. + /// </summary> + public bool Transacted + { + get + { + CheckNotClosed(); + return _transacted; + } + } + + /// <summary> + /// Prefetch value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetch + { + get { return DefaultPrefetchHigh; } + } + + /// <summary> + /// Prefetch low value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } + + /// <summary> + /// Prefetch high value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } + + /// <summary> Indicates whether or not this channel is currently suspended. </summary> + public bool IsSuspended + { + get { return _suspended; } + } + + /// <summary> Provides the channels number within the the connection. </summary> + public ushort ChannelId + { + get { return _channelId; } + } + + /// <summary> Provides the connection that this channel runs over. </summary> + public AMQConnection Connection + { + get { return _connection; } + } + + /// <summary> + /// Declare a new exchange. + /// </summary> + /// <param name="exchangeName">Name of the exchange</param> + /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param> + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); + } + + /// <summary> + /// Declare a new exchange using the default exchange class. + /// </summary> + /// <param name="exchangeName">Name of the exchange</param> + public void DeleteExchange(string exchangeName) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Declare a new queue with the specified set of arguments. + /// </summary> + /// <param name="queueName">Name of the queue</param> + /// <param name="isDurable">True if the queue should be durable</param> + /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> + /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + } + + /// <summary> + /// Delete a queue with the specifies arguments. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param> + /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); + } + + /// <summary> + /// Generate a new Unique name to use for a queue. + /// </summary> + /// <returns>A unique name to this channel</returns> + public string GenerateUniqueName() + { + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); + } + + /// <summary> + /// Removes all messages from a queue. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void PurgeQueue(string queueName, bool noWait) + { + DoPurgeQueue(queueName, noWait); + } + + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); + } + + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param> + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + /// <summary> + /// Create a new empty message with no body. + /// </summary> + /// <returns>The new message</returns> + public IMessage CreateMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// <summary> + /// Create a new message of the specified MIME type. + /// </summary> + /// <param name="mimeType">The mime type to create</param> + /// <returns>The new message</returns> + public IMessage CreateMessage(string mimeType) + { + return _messageFactoryRegistry.CreateMessage(mimeType); + } + + /// <summary> + /// Creates a new message for bytes (application/octet-stream). + /// </summary> + /// <returns>The new message</returns> + public IBytesMessage CreateBytesMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// <summary> + /// Creates a new text message (text/plain) with empty content. + /// </summary> + /// <returns>The new message</returns> + public ITextMessage CreateTextMessage() + { + return CreateTextMessage(String.Empty); + } + + /// <summary> + /// Creates a new text message (text/plain) with a body. + /// </summary> + /// <param name="text">Initial body of the message</param> + /// <returns>The new message</returns> + public ITextMessage CreateTextMessage(string text) + { + ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + msg.Text = text; + return msg; + } + + /// <summary> + /// Creates a new Consumer using the builder pattern. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <returns>The builder object</returns> + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + { + return new MessageConsumerBuilder(this, queueName); + } + + /// <summary> + /// Creates a new consumer. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <param name="prefetchLow">Low prefetch value</param> + /// <param name="prefetchHigh">High prefetch value</param> + /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> + /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> + /// <returns>The new consumer</returns> + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + } + + /// <summary> + /// Unsubscribe from a queue. + /// </summary> + /// <param name="subscriptionName">Subscription name</param> + public void Unsubscribe(String name) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Create a new message publisher using the builder pattern. + /// </summary> + /// <returns>The builder object</returns> + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); + } + + /// <summary> + /// Create a new message publisher. + /// </summary> + /// <param name="exchangeName">Name of exchange to publish to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="deliveryMode">Default delivery mode</param> + /// <param name="timeToLive">Default TTL time of messages</param> + /// <param name="immediate">If true, sent immediately</param> + /// <param name="mandatory">If true, the broker will return an error + /// (as a connection exception) if the message cannot be delivered</param> + /// <param name="priority">Default message priority</param> + /// <returns>The new message publisher</returns> + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + /// <summary> + /// Recover after transaction failure. + /// </summary> + /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks> + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); + + throw new NotImplementedException(); + } + + /// <summary> + /// Commit the transaction. + /// </summary> + public void Commit() + { + // FIXME: Fail over safety. Needs FailoverSupport? + CheckNotClosed(); + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeDelivered(); + } + + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } + } + + /// <summary> + /// Rollback the transaction. + /// </summary> + public void Rollback() + { + lock (_suspensionLock) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + bool suspended = IsSuspended; + if (!suspended) + { + Suspend(true); + } + + // Reject up to message last delivered (if any) for each consumer. + // Need to send reject for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.RejectUnacked(); + } + + _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + + if ( !suspended ) + { + Suspend(false); + } + } + catch (AMQException e) + { + throw new QpidException("Failed to rollback", e); + } + } + } + + /// <summary> + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// </summary> + /// <returns>A new disconnected channel</returns> + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); + } + + public override void Close() + { + lock (_connection.FailoverMutex) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session + + lock (_closingLock) + { + SetClosed(); + + // we pass null since this is not an error case + CloseProducersAndConsumers(null); + + try + { + _connection.CloseSession(this); + } + catch (AMQException e) + { + throw new QpidException("Error closing session: " + e); + } + finally + { + _connection.DeregisterSession(_channelId); + } + } + } + } + + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void ClosedWithException(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } + else + { + _queue.Enqueue(message); + } + } + + public void Dispose() + { + Close(); + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Close all producers or consumers. This is called either in the error case or when closing the session normally. + /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> + /// + private void CloseProducersAndConsumers(AMQException amqe) + { + try + { + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + CloseConsumers(amqe); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + /// <summary> + /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + /// currently no way of propagating errors to message producers (this is a JMS limitation). + /// </summary> + private void CloseProducers() + { + _logger.Debug("Closing producers on session " + this); + + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + ArrayList clonedProducers = new ArrayList(_producers.Values); + + foreach (BasicMessageProducer prod in clonedProducers) + { + _logger.Debug("Closing producer " + prod); + prod.Close(); + } + + // at this point the _producers map is empty + } + + /// <summary> + /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + /// <param name="error">not null if this is a result of an error occurring at the connection level</param> + private void CloseConsumers(Exception error) + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer con in clonedConsumers) + { + if (error != null) + { + con.NotifyError(error); + } + else + { + con.Close(); + } + } + + // at this point the _consumers map will be empty + } + + private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + lock (_closingLock) + { + CheckNotClosed(); + + try + { + return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, + this, GetNextProducerId(), + deliveryMode, timeToLive, immediate, mandatory, priority); + } + catch (AMQException e) + { + _logger.Error("Error creating message producer: " + e, e); + throw new QpidException("Error creating message producer", e); + } + } + } + + /// <summary> Creates a message consumer on this channel.</summary> + /// + /// <param name="queueName">The name of the queue to attach the consumer to.</param> + /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param> + /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param> + /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param> + /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param> + /// + /// <return>The message consumer.</return> + private IMessageConsumer CreateConsumerImpl(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) + { + lock (_closingLock) + { + CheckNotClosed(); + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); + try + { + RegisterConsumer(consumer); + } + catch (AMQException e) + { + throw new QpidException("Error registering consumer: " + e, e); + } + + return consumer; + } + } + + private void CheckTransacted() + { + if (!Transacted) + { + throw new InvalidOperationException("Channel is not transacted"); + } + } + + private void CheckNotTransacted() + { + if (Transacted) + { + throw new InvalidOperationException("Channel is transacted"); + } + } + + internal void Start() + { + _dispatcher = new Dispatcher(this); + Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); + dispatcherThread.IsBackground = true; + dispatcherThread.Start(); + } + + internal void Stop() + { + Suspend(true); + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + } + + internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) + { + _consumers[consumerTag] = consumer; + } + + /// <summary> + /// Called by the MessageConsumer when closing, to deregister the consumer from the + /// map from consumerTag to consumer instance. + /// </summary> + /// <param name="consumerTag">the consumer tag, that was broker-generated</param> + internal void DeregisterConsumer(string consumerTag) + { + _consumers.Remove(consumerTag); + } + + internal void RegisterProducer(long producerId, IMessagePublisher publisher) + { + _producers[producerId] = publisher; + } + + internal void DeregisterProducer(long producerId) + { + _producers.Remove(producerId); + } + + private long GetNextProducerId() + { + return ++_nextProducerId; + } + + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after + * failover when the client has veoted resubscription. + * + * The caller of this method must already hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + _connection.DeregisterSession(_channelId); + MarkClosedProducersAndConsumers(); + } + + private void MarkClosedProducersAndConsumers() + { + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + MarkClosedConsumers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + private void MarkClosedConsumers() + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer consumer in clonedConsumers) + { + consumer.MarkClosed(); + } + // at this point the _consumers map will be empty + } + + private void DoPurgeQueue(string queueName, bool noWait) + { + try + { + _logger.DebugFormat("PurgeQueue {0}", queueName); + + AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); + + if (noWait) + _connection.ProtocolWriter.Write(purgeQueue); + else + _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); + } + catch (AMQException) + { + throw; + } + } + + /** + * Replays frame on fail over. + * + * @throws AMQException + */ + internal void ReplayOnFailOver() + { + _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); + foreach (AMQFrame frame in _replayFrames) + { + _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); + _connection.ProtocolWriter.Write(frame); + } + } + + /// <summary> + /// Callers must hold the failover mutex before calling this method. + /// </summary> + /// <param name="consumer"></param> + private void RegisterConsumer(BasicMessageConsumer consumer) + { + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, + consumer.Exclusive, consumer.AcknowledgeMode); + consumer.ConsumerTag = consumerTag; + _consumers.Add(consumerTag, consumer); + } + + internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) + { + + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", + queueName, exchangeName, routingKey, args)); + + AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args); + _replayFrames.Add(queueBind); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueBind); + } + } + + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + + AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true, new FieldTable()); + + _replayFrames.Add(basicConsume); + + _connection.ProtocolWriter.Write(basicConsume); + return tag; + } + + private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + try + { + _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); + + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); + + _replayFrames.Add(queueDelete); + + if (noWait) + { + _connection.ProtocolWriter.Write(queueDelete); + } + else + { + _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); + } + } + catch (AMQException) + { + throw; + } + } + + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", + queueName, isDurable, isExclusive, isAutoDelete)); + + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, + isAutoDelete, true, null); + + _replayFrames.Add(queueDeclare); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueDeclare); + } + } + + // AMQP-level method. + private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, + string exchangeClass, bool passive, bool durable, + bool autoDelete, bool xinternal, bool noWait, FieldTable args) + { + _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", + _channelId, exchangeName, exchangeClass)); + + AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, + durable, autoDelete, xinternal, noWait, args); + + _replayFrames.Add(declareExchange); + + if (noWait) + { + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(declareExchange); + } + } + else + { + throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); + //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + } + } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) + { + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } + + public void RejectMessage(ulong deliveryTag, bool requeue) + { + if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) + { + AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); + _connection.ProtocolWriter.Write(rejectFrame); + } + } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + + if (errorCode == AMQConstant.NO_CONSUMERS.Code) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } + else if (errorCode == AMQConstant.NO_ROUTE.Code) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } + else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + + _connection.ExceptionReceived(exception); + } + catch (Exception ex) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + } + + private void OnPrefetchLowMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + + private void OnPrefetchHighMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock (_suspensionLock) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + + /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. + /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message + /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new + /// message. + /// + /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be + /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a + /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/> + /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/> + /// </table> + /// </summary> + /// + /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. + /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on + /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> + /// + /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should + /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks> + private class Dispatcher + { + /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> + private int _stopped = 0; + + /// <summary> The channel for which this is a dispatcher. </summary> + private AmqChannel _containingChannel; + + /// <summary> Creates a dispatcher on the specified channel. </summary> + /// + /// <param name="containingChannel"> The channel on which this is a dispatcher. </param> + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and + /// the connection of bounced messages.</summary> + public void RunDispatcher() + { + UnprocessedMessage message; + + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the transport thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); + } + + /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary> + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs new file mode 100644 index 0000000000..e88cf8f04c --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -0,0 +1,473 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using System.Collections; +using System.Collections.Generic; +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client +{ + public class BasicMessageConsumer : Closeable, IMessageConsumer + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageConsumer)); + + private bool _noLocal; + + /** Holds the exclusive status flag for the consumers access to its queue. */ + private bool _exclusive; + + public bool Exclusive + { + get { return _exclusive; } + } + + public bool NoLocal + { + get { return _noLocal; } + set { _noLocal = value; } + } + + private AcknowledgeMode _acknowledgeMode; + + public AcknowledgeMode AcknowledgeMode + { + get { return _acknowledgeMode; } + } + + private MessageReceivedDelegate _messageListener; + + private bool IsMessageListenerSet + { + get { return _messageListener != null; } + } + + /// <summary> + /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the + /// broker + /// </summary> + private string _consumerTag; + + /// <summary> + /// We need to know the channel id when constructing frames + /// </summary> + private ushort _channelId; + + private readonly string _queueName; + + /// <summary> + /// Protects the setting of a messageListener + /// </summary> + private readonly object _syncLock = new object(); + + /// <summary> + /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchHigh; + + /// <summary> + /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchLow; + + /// <summary> + /// When true indicates that either a message listener is set or that + /// a blocking receive call is in progress + /// </summary> + private bool _receiving; + + /// <summary> + /// Used in the blocking receive methods to receive a message from + /// the Channel thread. + /// </summary> + private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue(); + + private MessageFactoryRegistry _messageFactory; + + private AmqChannel _channel; + + // <summary> + // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + // </summary> + //private long _lastDeliveryTag; + + /// <summary> + /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed. + /// </summary> + private LinkedList<long> _receivedDeliveryTags; + + /// <summary> + /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode + /// </summary> + private int _outstanding; + + /// <summary> + /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + /// </summary> + private bool _dups_ok_acknowledge_send; + + internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, + MessageFactoryRegistry messageFactory, AmqChannel channel, + int prefetchHigh, int prefetchLow, bool exclusive) + { + _channelId = channelId; + _queueName = queueName; + _noLocal = noLocal; + _messageFactory = messageFactory; + _channel = channel; + _acknowledgeMode = _channel.AcknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + _exclusive = exclusive; + + if (_acknowledgeMode == AcknowledgeMode.SessionTransacted) + { + _receivedDeliveryTags = new LinkedList<long>(); + } + } + + #region IMessageConsumer Members + + public MessageReceivedDelegate OnMessage + { + get + { + return _messageListener; + } + set + { + CheckNotClosed(); + + lock (_syncLock) + { + // If someone is already receiving + if (_messageListener != null && _receiving) + { + throw new InvalidOperationException("Another thread is already receiving..."); + } + + _messageListener = value; + + _receiving = (_messageListener != null); + + if (_receiving) + { + _logger.Debug("Message listener set for queue with name " + _queueName); + } + } + } + } + + public IMessage Receive(long delay) + { + CheckNotClosed(); + + lock (_syncLock) + { + // If someone is already receiving + if (_receiving) + { + throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); + } + + _receiving = true; + } + + try + { + object o = _messageQueue.Dequeue(delay); + + return ReturnMessageOrThrowAndPostDeliver(o); + } + finally + { + lock (_syncLock) + { + _receiving = false; + } + } + } + + private IMessage ReturnMessageOrThrowAndPostDeliver(object o) + { + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; + } + + public IMessage Receive() + { + return Receive(Timeout.Infinite); + } + + public IMessage ReceiveNoWait() + { + return Receive(0); + } + + #endregion + + /// <summary> + /// We can get back either a Message or an exception from the queue. This method examines the argument and deals + /// with it by throwing it (if an exception) or returning it (in any other case). + /// </summary> + /// <param name="o">the object off the queue</param> + /// <returns> a message only if o is a Message</returns> + /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not + /// a QpidMessagingException is created with the linked exception set appropriately</exception> + private IMessage ReturnMessageOrThrow(object o) + { + // errors are passed via the queue too since there is no way of interrupting the poll() via the API. + if (o is Exception) + { + Exception e = (Exception) o; + throw new QpidException("Message consumer forcibly closed due to error: " + e, e); + } + else + { + return (IMessage) o; + } + } + + #region IDisposable Members + + public void Dispose() + { + Close(); + } + + #endregion + + public override void Close() + { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex + lock (_channel.Connection.FailoverMutex) + { + lock (_closingLock) + { + Interlocked.Exchange(ref _closed, CLOSED); + + AMQFrame cancelFrame = BasicCancelBody.CreateAMQFrame(_channelId, _consumerTag, false); + + try + { + _channel.Connection.ConvenientProtocolWriter.SyncWrite( + cancelFrame, typeof(BasicCancelOkBody)); + } + catch (AMQException e) + { + _logger.Error("Error closing consumer: " + e, e); + throw new QpidException("Error closing consumer: " + e); + } + finally + { + DeregisterConsumer(); + } + } + } + } + + /** + * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case + * of a message listener or a synchronous receive() caller. + * + * @param messageFrame the raw unprocessed mesage + * @param channelId channel on which this message was sent + */ + internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); + } + try + { + AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, + messageFrame.DeliverBody.Redelivered, + messageFrame.ContentHeader, + messageFrame.Bodies); + + _logger.Debug("Message is of type: " + jmsMessage.GetType().Name); + + PreDeliver(jmsMessage); + + if (IsMessageListenerSet) + { + // We do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started. +#if __MonoCS__ + _messageListener(jmsMessage); +#else + _messageListener.Invoke(jmsMessage); +#endif + PostDeliver(jmsMessage); + } + else + { + _messageQueue.Enqueue(jmsMessage); + } + } + catch (Exception e) + { + _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME + } + } + + + internal void NotifyError(Exception cause) + { + lock (_syncLock) + { + SetClosed(); + + // we have no way of propagating the exception to a message listener - a JMS limitation - so we + // deal with the case where we have a synchronous receive() waiting for a message to arrive + if (_messageListener == null) + { + // offer only succeeds if there is a thread waiting for an item from the queue + _messageQueue.Enqueue(cause); + _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); + } + DeregisterConsumer(); + } + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean + /// case and in the case of an error occurring. + /// </summary> + internal void DeregisterConsumer() + { + _channel.DeregisterConsumer(_consumerTag); + } + + public string ConsumerTag + { + get + { + return _consumerTag; + } + set + { + _consumerTag = value; + } + } + + /** + * Called when you need to invalidate a consumer. Used for example when failover has occurred and the + * client has vetoed automatic resubscription. + * The caller must hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + DeregisterConsumer(); + } + + public string QueueName + { + get { return _queueName; } + } + + /// <summary> + /// Acknowledge up to last message delivered (if any). Used when commiting. + /// </summary> + internal void AcknowledgeDelivered() + { + foreach (long tag in _receivedDeliveryTags) + { + _channel.AcknowledgeMessage((ulong)tag, false); + } + + _receivedDeliveryTags.Clear(); + } + + internal void RejectUnacked() + { + foreach (long tag in _receivedDeliveryTags) + { + _channel.RejectMessage((ulong)tag, true); + } + + _receivedDeliveryTags.Clear(); + } + + private void PreDeliver(AbstractQmsMessage msg) + { + switch (AcknowledgeMode) + { + case AcknowledgeMode.PreAcknowledge: + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false); + break; + + case AcknowledgeMode.ClientAcknowledge: + // We set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame. + //msg.setAMQSession(_session); + msg.Channel = _channel; + break; + } + } + + private void PostDeliver(IMessage m) + { + AbstractQmsMessage msg = (AbstractQmsMessage) m; + switch (AcknowledgeMode) + { + case AcknowledgeMode.DupsOkAcknowledge: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + if (_dups_ok_acknowledge_send) + { + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); + } + break; + + case AcknowledgeMode.AutoAcknowledge: + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); + break; + + case AcknowledgeMode.SessionTransacted: + _receivedDeliveryTags.AddLast(msg.DeliveryTag); + break; + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs new file mode 100644 index 0000000000..ca6d2abee5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Messaging; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client +{ + public class BasicMessageProducer : Closeable, IMessagePublisher + { + protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); + + /// <summary> + /// If true, messages will not get a timestamp. + /// </summary> + private bool _disableTimestamps; + + /// <summary> + /// Priority of messages created by this producer. + /// </summary> + private int _messagePriority; + + /// <summary> + /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + /// </summary> + private long _timeToLive; + + /// <summary> + /// Delivery mode used for this producer. + /// </summary> + private DeliveryMode _deliveryMode; + + private bool _immediate; + private bool _mandatory; + + string _exchangeName; + string _routingKey; + + /// <summary> + /// Default encoding used for messages produced by this producer. + /// </summary> + private string _encoding; + + /// <summary> + /// Default encoding used for message produced by this producer. + /// </summary> + private string _mimeType; + + /// <summary> + /// True if this producer was created from a transacted session + /// </summary> + private bool _transacted; + + private ushort _channelId; + + /// <summary> + /// This is an id generated by the session and is used to tie individual producers to the session. This means we + /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers + /// to the session so that when an error is propagated to the session it can close the producer (meaning that + /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + /// </summary> + private long _producerId; + + /// <summary> + /// The session used to create this producer + /// </summary> + private AmqChannel _channel; + + public BasicMessageProducer(string exchangeName, string routingKey, + bool transacted, + ushort channelId, + AmqChannel channel, + long producerId, + DeliveryMode deliveryMode, + long timeToLive, + bool immediate, + bool mandatory, + int priority) + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _transacted = transacted; + _channelId = channelId; + _channel = channel; + _producerId = producerId; + _deliveryMode = deliveryMode; + _timeToLive = timeToLive; + _immediate = immediate; + _mandatory = mandatory; + _messagePriority = priority; + + _channel.RegisterProducer(producerId, this); + } + + + #region IMessagePublisher Members + + public DeliveryMode DeliveryMode + { + get + { + CheckNotClosed(); + return _deliveryMode; + } + set + { + CheckNotClosed(); + _deliveryMode = value; + } + } + + public string ExchangeName + { + get { return _exchangeName; } + } + + public string RoutingKey + { + get { return _routingKey; } + } + + public bool DisableMessageID + { + get + { + throw new Exception("The method or operation is not implemented."); + } + set + { + throw new Exception("The method or operation is not implemented."); + } + } + + public bool DisableMessageTimestamp + { + get + { + CheckNotClosed(); + return _disableTimestamps; + } + set + { + CheckNotClosed(); + _disableTimestamps = value; + } + } + + public int Priority + { + get + { + CheckNotClosed(); + return _messagePriority; + } + set + { + CheckNotClosed(); + if ( value < 0 || value > 9 ) + { + throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = value; + } + } + + public override void Close() + { + _logger.Debug("Closing producer " + this); + Interlocked.Exchange(ref _closed, CLOSED); + _channel.DeregisterProducer(_producerId); + } + + public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + deliveryMode, + priority, + (uint)timeToLive, + _mandatory, + _immediate + ); + } + + public void Send(IMessage msg) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + _deliveryMode, + _messagePriority, + (uint)_timeToLive, + _mandatory, + _immediate + ); + } + + // This is a short-term hack (knowing that this code will be re-vamped sometime soon) + // to facilitate publishing messages to potentially non-existent recipients. + public void Send(IMessage msg, bool mandatory) + { + CheckNotClosed(); + SendImpl( + _exchangeName, + _routingKey, + (AbstractQmsMessage)msg, + _deliveryMode, + _messagePriority, + (uint)_timeToLive, + mandatory, + _immediate + ); + } + + public long TimeToLive + { + get + { + CheckNotClosed(); + return _timeToLive; + } + set + { + CheckNotClosed(); + if ( value < 0 ) + { + throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); + } + _timeToLive = value; + } + } + + #endregion + + public string MimeType + { + get + { + CheckNotClosed(); + return _mimeType; + } + set + { + CheckNotClosed(); + _mimeType = value; + } + } + + public string Encoding + { + get + { + CheckNotClosed(); + return _encoding; + } + set + { + CheckNotClosed(); + _encoding = value; + } + } + + public void Dispose() + { + Close(); + } + + #region Message Publishing + + private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) + { + // todo: handle session access ticket + AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame( + _channel.ChannelId, 0, exchangeName, + routingKey, mandatory, immediate + ); + + // fix message properties + if ( !_disableTimestamps ) + { + message.Timestamp = DateTime.UtcNow.Ticks; + message.Expiration = message.Timestamp + timeToLive; + } else + { + message.Expiration = 0; + } + message.DeliveryMode = deliveryMode; + message.Priority = (byte)priority; + + ByteBuffer payload = message.Data; + int payloadLength = payload.Limit; + + ContentBody[] contentBodies = CreateContentBodies(payload); + AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; + for ( int i = 0; i < contentBodies.Length; i++ ) + { + frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); + } + if ( contentBodies.Length > 0 && _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + // weight argument of zero indicates no child content headers, just bodies + AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame( + _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, + message.ContentHeaderProperties, (uint)payloadLength + ); + if ( _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + lock ( _channel.Connection.FailoverMutex ) + { + _channel.Connection.ProtocolWriter.Write(compositeFrame); + } + } + + + /// <summary> + /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated + /// maximum frame size. + /// </summary> + /// <param name="payload"></param> + /// <returns>return the array of content bodies</returns> + private ContentBody[] CreateContentBodies(ByteBuffer payload) + { + if ( payload == null ) + { + return null; + } else if ( payload.Remaining == 0 ) + { + return new ContentBody[0]; + } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1); + int frameCount = CalculateContentBodyFrames(payload); + ContentBody[] bodies = new ContentBody[frameCount]; + for ( int i = 0; i < frameCount; i++ ) + { + int length = (payload.Remaining >= framePayloadMax) + ? framePayloadMax : payload.Remaining; + bodies[i] = new ContentBody(payload, (uint)length); + } + return bodies; + } + + private int CalculateContentBodyFrames(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account + // for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if ( (payload == null) || (payload.Remaining == 0) ) + { + frameCount = 0; + } else + { + int dataLength = payload.Remaining; + int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; + frameCount = (int)(dataLength / framePayloadMax) + lastFrame; + } + + return frameCount; + } + #endregion // Message Publishing + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Closeable.cs b/qpid/dotnet/Qpid.Client/Client/Closeable.cs new file mode 100644 index 0000000000..b9664ccea3 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Closeable.cs @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client +{ + /// <summary>Closeable provides monitoring of the state of a closeable resource; whether it is open or closed. It also provides a lock on which + /// attempts to close the resource from multiple threads can be coordinated. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Close (and clean-up) a resource. + /// <tr><td> Monitor the state of a closeable resource. + /// <tr><td> Synchronous attempts to close resource from concurrent threads. + /// </table> + /// </summary> + /// + /// <remarks>Poor encapsulation of the close lock. Better to completely hide the implementation, such that there is a method, e.g., DoSingleClose, + /// that sub-classes implement. Guaranteed to only be called by one thread at once, and iff the object is not already closed. That is, multiple + /// simultaneous closes will result in a single call to the real close method. Put the wait and condition checking loop in this base class. + /// </remarks> + public abstract class Closeable : ICloseable + { + /// <summary> Constant representing the closed state. </summary> + protected const int CLOSED = 1; + + /// <summary> Constant representing the open state. </summary> + protected const int NOT_CLOSED = 2; + + /// <summary> Used to ensure orderly closing of the object. </summary> + protected readonly object _closingLock = new object(); + + /// <summary> Indicates the state of this resource; open or closed. </summary> + protected int _closed = NOT_CLOSED; + + /// <summary> + /// Checks the not closed. + /// </summary> + /// + /// <remarks>Don't like check methods that throw exceptions. a) it can come as a surprise without checked exceptions, b) it limits the + /// callers choice, if the caller would prefer a boolean, c) it is not side-effect free programming, where such could be used. Get rid + /// of this and replace with boolean.</remarks> + protected void CheckNotClosed() + { + if (_closed == CLOSED) + { + throw new InvalidOperationException("Object " + ToString() + " has been closed"); + } + } + + /// <summary>Indicates whether this resource is closed.</summary> + /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value> + public bool Closed + { + get + { + return _closed == CLOSED; + } + } + + /// <summary> Close this resource. </summary> + public abstract void Close(); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs b/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs new file mode 100644 index 0000000000..ae9225a53a --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs @@ -0,0 +1,84 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Specialized;
+using System.Configuration;
+using System.Text;
+
+using Apache.Qpid.Client.Security;
+using Apache.Qpid.Sasl.Mechanisms;
+
+namespace Apache.Qpid.Client.Configuration
+{
+ public class AuthenticationConfigurationSectionHandler
+ : IConfigurationSectionHandler
+ {
+
+ public object Create(object parent, object configContext, System.Xml.XmlNode section)
+ {
+ NameValueSectionHandler handler = new NameValueSectionHandler();
+ OrderedHashTable schemes = new OrderedHashTable();
+
+ NameValueCollection options = (NameValueCollection)
+ handler.Create(parent, configContext, section);
+
+ if ( options != null )
+ {
+ foreach ( string key in options.Keys )
+ {
+ Type type = Type.GetType(options[key]);
+ if ( type == null )
+ throw new ConfigurationException(string.Format("Type '{0}' not found", key));
+ if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) )
+ throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key));
+
+ schemes.Add(key, type);
+ }
+ }
+
+ return schemes;
+ }
+
+ } // class AuthenticationConfigurationSectionHandler
+
+ public class OrderedHashTable : Hashtable
+ {
+ private ArrayList _keys = new ArrayList();
+
+ public IList OrderedKeys
+ {
+ get { return _keys; }
+ }
+
+ public override void Add(object key, object value)
+ {
+ base.Add(key, value);
+ _keys.Add(key);
+ }
+ public override void Remove(object key)
+ {
+ base.Remove(key);
+ _keys.Remove(key);
+ }
+ }
+} // namespace Apache.Qpid.Client.Configuration
diff --git a/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs b/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs new file mode 100644 index 0000000000..b21486bfa8 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client +{ + public class ConnectionTuneParameters + { + private uint _frameMax; + + private ushort _channelMax; + + private uint _hearbeat; + + private uint _txnLimit; + + public uint FrameMax + { + get + { + return _frameMax; + } + set + { + _frameMax = value; + } + } + + public ushort ChannelMax + { + get + { + return _channelMax; + } + set + { + _channelMax = value; + } + } + + public uint Heartbeat + { + get + { + return _hearbeat; + } + set + { + _hearbeat = value; + } + } + + public uint TxnLimit + { + get + { + return _txnLimit; + } + set + { + _txnLimit = value; + } + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs new file mode 100644 index 0000000000..7013746414 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client.Failover +{ + /// <summary> + /// This exception is thrown when failover is taking place and we need to let other + /// parts of the client know about this. + /// </summary> + [Serializable] + class FailoverException : Exception + { + public FailoverException(String message) : base(message) + { + } + + protected FailoverException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs new file mode 100644 index 0000000000..83c69b7d25 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; + +namespace Apache.Qpid.Client.Failover +{ + public class FailoverHandler + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverHandler)); + + private AMQConnection _connection; + + /** + * Used where forcing the failover host + */ + private String _host; + + /** + * Used where forcing the failover port + */ + private int _port; + + public FailoverHandler(AMQConnection connection) + { + _connection = connection; + } + + public void Run() + { + if (Thread.CurrentThread.IsBackground) + { + throw new InvalidOperationException("FailoverHandler must Run on a non-background thread."); + } + + AMQProtocolListener pl = _connection.ProtocolListener; + pl.FailoverLatch = new ManualResetEvent(false); + + // We wake up listeners. If they can handle failover, they will extend the + // FailoverSupport class and will in turn block on the latch until failover + // has completed before retrying the operation + _connection.ProtocolListener.PropagateExceptionToWaiters(new FailoverException("Failing over about to start")); + + // Since failover impacts several structures we protect them all with a single mutex. These structures + // are also in child objects of the connection. This allows us to manipulate them without affecting + // client code which runs in a separate thread. + lock (_connection.FailoverMutex) + { + _log.Info("Starting failover process"); + + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" + // state works, without us having to terminate any existing "state waiters". We could theoretically + // have a state waiter waiting until the connection is closed for some reason. Or in future we may have + // a slightly more complex state model therefore I felt it was worthwhile doing this. + AMQStateManager existingStateManager = _connection.ProtocolListener.StateManager; + _connection.ProtocolListener.StateManager = new AMQStateManager(); + if (!_connection.FirePreFailover(_host != null)) + { + _connection.ProtocolListener.StateManager = existingStateManager; + if (_host != null) + { + _connection.ExceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + } + else + { + _connection.ExceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + } + pl.FailoverLatch.Set(); + pl.FailoverLatch = null; + return; + } + bool failoverSucceeded; + // when host is non null we have a specified failover host otherwise we all the client to cycle through + // all specified hosts + + // if _host has value then we are performing a redirect. + if (_host != null) + { + // todo: fix SSL support! + failoverSucceeded = _connection.AttemptReconnection(_host, _port, null); + } + else + { + failoverSucceeded = _connection.AttemptReconnection(); + } + + // XXX: at this point it appears that we are going to set StateManager to existingStateManager in + // XXX: both paths of control. + if (!failoverSucceeded) + { + _connection.ProtocolListener.StateManager = existingStateManager; + _connection.ExceptionReceived( + new AMQDisconnectedException("Server closed connection and no failover " + + "was successful")); + } + else + { + _connection.ProtocolListener.StateManager = existingStateManager; + try + { + if (_connection.FirePreResubscribe()) + { + _log.Info("Resubscribing on new connection"); + _connection.ResubscribeChannels(); + } + else + { + _log.Info("Client vetoed automatic resubscription"); + } + _connection.FireFailoverComplete(); + _connection.ProtocolListener.FailoverState = FailoverState.NOT_STARTED; + _log.Info("Connection failover completed successfully"); + } + catch (Exception e) + { + _log.Info("Failover process failed - exception being propagated by protocol handler"); + _connection.ProtocolListener.FailoverState = FailoverState.FAILED; + try + { + _connection.ProtocolListener.OnException(e); + } + catch (Exception ex) + { + _log.Error("Error notifying protocol session of error: " + ex, ex); + } + } + } + } + pl.FailoverLatch.Set(); + } + + public String getHost() + { + return _host; + } + + public void setHost(String host) + { + _host = host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int port) + { + _port = port; + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs new file mode 100644 index 0000000000..3058cdcd69 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.Failover +{ + /// <summary> + /// Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be + /// dealt with and can happen during failover. + /// </summary> + enum FailoverState + { + NOT_STARTED, IN_PROGRESS, FAILED + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs new file mode 100644 index 0000000000..afa5301f39 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +namespace Apache.Qpid.Client.Failover +{ + public abstract class FailoverSupport + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverSupport)); + + public object execute(AMQConnection con) + { + // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. + // Any method that can potentially block for any reason should use this class so that deadlock will not + // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) + // that might be causing a block. When that happens, the exception is caught here and the mutex is released + // before waiting for the failover to complete (either successfully or unsuccessfully). + while (true) + { + con.ProtocolListener.BlockUntilNotFailingOver(); + lock (con.FailoverMutex) + { + try + { + return operation(); + } + catch (FailoverException e) + { + _log.Info("Failover exception caught during operation", e); + } + } + } + } + + protected abstract object operation(); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs new file mode 100644 index 0000000000..def1e78e8c --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class BasicDeliverMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicDeliverMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + UnprocessedMessage msg = new UnprocessedMessage(); + msg.DeliverBody = (BasicDeliverBody) evt.Method; + msg.ChannelId = evt.ChannelId; + _logger.Debug("New JmsDeliver method received"); + evt.ProtocolSession.UnprocessedMessageReceived(msg); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs new file mode 100644 index 0000000000..f413dfc9c6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class BasicReturnMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicReturnMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("New Basic.Return method received"); + UnprocessedMessage msg = new UnprocessedMessage(); + msg.DeliverBody = null; + msg.BounceBody = (BasicReturnBody) evt.Method; + msg.ChannelId = evt.ChannelId; + + evt.ProtocolSession.UnprocessedMessageReceived(msg); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs new file mode 100644 index 0000000000..9ed09a0d01 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Protocol; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class ChannelCloseMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ChannelCloseMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.Method; + + int errorCode = method.ReplyCode; + string reason = method.ReplyText; + if (_logger.IsDebugEnabled) + { + _logger.Debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); + evt.ProtocolSession.WriteFrame(frame); + + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) + { + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + throw new AMQNoConsumersException(reason); + if ( errorCode == AMQConstant.NO_ROUTE.Code ) + throw new AMQNoRouteException(reason); + if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code ) + throw new AMQInvalidArgumentException(reason); + if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code ) + throw new AMQInvalidRoutingKeyException(reason); + // any other + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs new file mode 100644 index 0000000000..66cff3bc65 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionCloseMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionClose frame received"); + ConnectionCloseBody method = (ConnectionCloseBody) evt.Method; + + int errorCode = method.ReplyCode; + String reason = method.ReplyText; + + // send CloseOK + evt.ProtocolSession.WriteFrame(ConnectionCloseOkBody.CreateAMQFrame(evt.ChannelId)); + + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) + { + if ( errorCode == AMQConstant.NOT_ALLOWED.Code ) + { + _logger.Info("Authentication Error: " + Thread.CurrentThread.Name); + evt.ProtocolSession.CloseProtocolSession(); + + //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. + stateManager.ChangeState(AMQState.CONNECTION_NOT_STARTED); + + throw new AMQAuthenticationException(errorCode, reason); + } else + { + _logger.Info("Connection close received with error code " + errorCode); + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } + } + // this actually closes the connection in the case where it is not an error. + evt.ProtocolSession.CloseProtocolSession(); + stateManager.ChangeState(AMQState.CONNECTION_CLOSED); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs new file mode 100644 index 0000000000..038da15731 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionCloseOkHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseOkHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionCloseOk frame received"); +// ConnectionCloseOkBody method = (ConnectionCloseOkBody)evt.Method; + stateManager.ChangeState(AMQState.CONNECTION_CLOSED); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs new file mode 100644 index 0000000000..a12e4ead60 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionOpenOkMethodHandler : IStateAwareMethodListener + { + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + stateManager.ChangeState(AMQState.CONNECTION_OPEN); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs new file mode 100644 index 0000000000..08cc580b17 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionRedirectMethodHandler : IStateAwareMethodListener + { +// private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionRedirectMethodHandler)); + + private const int DEFAULT_REDIRECT_PORT = 5672; + + private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler(); + + public static ConnectionRedirectMethodHandler GetInstance() + { + return _handler; + } + + private ConnectionRedirectMethodHandler() + { + } + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + /*_logger.Info("ConnectionRedirect frame received"); + ConnectionRedirectBody method = (ConnectionRedirectBody) evt.Method; + + // the host is in the form hostname:port with the port being optional + int portIndex = method.Host.IndexOf(':'); + String host; + int port; + if (portIndex == -1) + { + host = method.Host; + port = DEFAULT_REDIRECT_PORT; + } + else + { + host = method.Host.Substring(0, portIndex); + port = Int32.Parse(method.Host.Substring(portIndex + 1)); + } + evt.ProtocolSession.Failover(host, port);*/ + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs new file mode 100644 index 0000000000..9333d4d0a6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionSecureMethodHandler : IStateAwareMethodListener + { + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + ISaslClient saslClient = evt.ProtocolSession.SaslClient; + if ( saslClient == null ) + { + throw new AMQException("No SASL client set up - cannot proceed with authentication"); + } + + + ConnectionSecureBody body = (ConnectionSecureBody)evt.Method; + + try + { + // Evaluate server challenge + byte[] response = saslClient.EvaluateChallenge(body.Challenge); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame responseFrame = ConnectionSecureOkBody.CreateAMQFrame( + evt.ChannelId, response); + evt.ProtocolSession.WriteFrame(responseFrame); + } catch ( SaslException e ) + { + throw new AMQException("Error processing SASL challenge: " + e, e); + } + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs new file mode 100644 index 0000000000..c54662286b --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Security; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; +using Apache.Qpid.Sasl; + + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionStartMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ConnectionStartMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + ConnectionStartBody body = (ConnectionStartBody) evt.Method; + AMQProtocolSession ps = evt.ProtocolSession; + + try + { + if ( body.Mechanisms == null ) + { + throw new AMQException("mechanism not specified in ConnectionStart method frame"); + } + string mechanisms = Encoding.UTF8.GetString(body.Mechanisms); + string selectedMechanism = ChooseMechanism(mechanisms); + if ( selectedMechanism == null ) + { + throw new AMQException("No supported security mechanism found, passed: " + mechanisms); + } + + byte[] saslResponse = DoAuthentication(selectedMechanism, ps); + + if (body.Locales == null) + { + throw new AMQException("Locales is not defined in Connection Start method"); + } + string allLocales = Encoding.ASCII.GetString(body.Locales); + string[] locales = allLocales.Split(' '); + string selectedLocale; + if (locales != null && locales.Length > 0) + { + selectedLocale = locales[0]; + } + else + { + throw new AMQException("No locales sent from server, passed: " + locales); + } + + stateManager.ChangeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = new FieldTable(); + clientProperties["product"] = "Apache.Qpid.NET"; + clientProperties["version"] = "1.0"; + clientProperties["platform"] = GetFullSystemInfo(); + clientProperties["instance"] = ps.ClientID; + AMQFrame frame = ConnectionStartOkBody.CreateAMQFrame( + evt.ChannelId, clientProperties, selectedMechanism, + saslResponse, selectedLocale); + ps.WriteFrame(frame); + } + catch (Exception e) + { + throw new AMQException(_log, "Unable to decode data: " + e, e); + } + } + + private string GetFullSystemInfo() + { + StringBuilder sysInfo = new StringBuilder(); + // check if we're running on mono or .net + Type monoRuntime = Type.GetType("Mono.Runtime"); + if ( monoRuntime != null ) + sysInfo.Append("Mono"); + else + sysInfo.Append(".NET"); + sysInfo.Append(" ").Append(Environment.Version); + sysInfo.Append(", ").Append(Environment.OSVersion); + return sysInfo.ToString(); + } + + private string ChooseMechanism(string mechanisms) + { + return CallbackHandlerRegistry.Instance.ChooseMechanism(mechanisms); + } + + private byte[] DoAuthentication(string selectedMechanism, AMQProtocolSession ps) + { + ISaslClient saslClient = Sasl.Sasl.CreateClient( + new string[] { selectedMechanism }, null, "AMQP", "localhost", + new Hashtable(), CreateCallbackHandler(selectedMechanism, ps) + ); + if ( saslClient == null ) + { + throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " + + selectedMechanism); + } + ps.SaslClient = saslClient; + try + { + return saslClient.HasInitialResponse ? + saslClient.EvaluateChallenge(new byte[0]) : null; + } catch ( Exception ex ) + { + ps.SaslClient = null; + throw new AMQException("Unable to create SASL client", ex); + } + } + + private IAMQCallbackHandler CreateCallbackHandler(string mechanism, AMQProtocolSession session) + { + Type type = CallbackHandlerRegistry.Instance.GetCallbackHandler(mechanism); + IAMQCallbackHandler handler = + (IAMQCallbackHandler)Activator.CreateInstance(type); + if ( handler == null ) + throw new AMQException("Unable to create callback handler: " + mechanism); + handler.Initialize(session); + return handler; + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs new file mode 100644 index 0000000000..15a1d908b7 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class ConnectionTuneMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionTuneMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionTune frame received"); + ConnectionTuneBody frame = (ConnectionTuneBody) evt.Method; + AMQProtocolSession session = evt.ProtocolSession; + + ConnectionTuneParameters parameters = session.ConnectionTuneParameters; + if (parameters == null) + { + parameters = new ConnectionTuneParameters(); + } + + _logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat)); + + parameters.FrameMax = frame.FrameMax; + parameters.Heartbeat = frame.Heartbeat; + session.ConnectionTuneParameters = parameters; + + stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED); + session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame( + evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat)); + session.WriteFrame(ConnectionOpenBody.CreateAMQFrame( + evt.ChannelId, session.AMQConnection.VirtualHost, null, true)); + + if (frame.Heartbeat > 0) + { + evt.ProtocolSession.AMQConnection.StartHeartBeatThread(frame.Heartbeat); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs new file mode 100644 index 0000000000..7290d758f8 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs @@ -0,0 +1,44 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client.Handler
+{
+ public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs new file mode 100644 index 0000000000..8bde707b00 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs @@ -0,0 +1,44 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client.Handler
+{
+ public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs new file mode 100644 index 0000000000..e58de2ab96 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class AMQMessage + { + protected IContentHeaderProperties _contentHeaderProperties; + + /// <summary> + /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required + /// </summary> + protected AmqChannel _channel; + + private long _deliveryTag; + + public AMQMessage(IContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + } + + public AMQMessage(IContentHeaderProperties properties) + : this(properties, -1) + { + } + + public long DeliveryTag + { + get { return _deliveryTag; } + } + + public AmqChannel Channel + { + get { return _channel; } + set { _channel = value; } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs new file mode 100644 index 0000000000..f352d62c11 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public abstract class AbstractQmsMessageFactory : IMessageFactory + { + public abstract AbstractQmsMessage CreateMessage(string mimeType); + + private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory)); + + protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader); + + protected AbstractQmsMessage CreateMessageWithBody(long messageNbr, + ContentHeaderBody contentHeader, + IList bodies) + { + ByteBuffer data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")"); + data = ((ContentBody)bodies[0]).Payload; + } + else + { + _logger.Debug("Fragmented message body (" + bodies.Count + " frames, bodySize=" + contentHeader.BodySize + ")"); + data = ByteBuffer.Allocate((int)contentHeader.BodySize); // XXX: Is cast a problem? + foreach (ContentBody body in bodies) { + data.Put(body.Payload); + //body.Payload.Release(); + } + + data.Flip(); + } + _logger.Debug("Creating message from buffer with position=" + data.Position + " and remaining=" + data.Remaining); + + return CreateMessage(messageNbr, data, contentHeader); + } + + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); + msg.Redelivered = redelivered; + return msg; + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs new file mode 100644 index 0000000000..34b47137e5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -0,0 +1,694 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public abstract class AbstractQmsMessage : AMQMessage, IMessage + { + private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + + protected bool _redelivered; + + protected ByteBuffer _data; + protected bool _readableMessage = false; + private QpidHeaders _headers; + + protected AbstractQmsMessage(ByteBuffer data) + : base(new BasicContentHeaderProperties()) + { + Init(data); + } + + protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + : this(contentHeader, deliveryTag) + { + Init(data); + } + + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) + { + Init(null); + } + + private void Init(ByteBuffer data) + { + _data = data; + if ( _data != null ) + { + _data.Acquire(); + } + _readableMessage = (data != null); + if ( ContentHeaderProperties.Headers == null ) + ContentHeaderProperties.Headers = new FieldTable(); + _headers = new QpidHeaders(ContentHeaderProperties.Headers); + } + + // + // Properties + // + + /// <summary> + /// The application message identifier + /// </summary> + public string MessageId + { + get + { + if (ContentHeaderProperties.MessageId == null) + { + ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; + } + return ContentHeaderProperties.MessageId; + } + set { ContentHeaderProperties.MessageId = value; } + } + + /// <summary> + /// The message timestamp + /// </summary> + public long Timestamp + { + get + { + // TODO: look at ulong/long choice + return (long) ContentHeaderProperties.Timestamp; + } + set + { + ContentHeaderProperties.Timestamp = (ulong) value; + } + } + + /// <summary> + /// The <see cref="CorrelationId"/> as a byte array. + /// </summary> + public byte[] CorrelationIdAsBytes + { + get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); } + set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); } + } + + /// <summary> + /// The application correlation identifier + /// </summary> + public string CorrelationId + { + get { return ContentHeaderProperties.CorrelationId; } + set { ContentHeaderProperties.CorrelationId = value; } + } + + struct Dest + { + public string ExchangeName; + public string RoutingKey; + + public Dest(string exchangeName, string routingKey) + { + ExchangeName = exchangeName; + RoutingKey = routingKey; + } + } + + /// <summary> + /// Exchange name of the reply-to address + /// </summary> + public string ReplyToExchangeName + { + get + { + return ReadReplyToHeader().ExchangeName; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.ExchangeName = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Routing key of the reply-to address + /// </summary> + public string ReplyToRoutingKey + { + get + { + return ReadReplyToHeader().RoutingKey; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.RoutingKey = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Non-persistent (1) or persistent (2) + /// </summary> + public DeliveryMode DeliveryMode + { + get + { + byte b = ContentHeaderProperties.DeliveryMode; + switch (b) + { + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); + } + } + set + { + ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); + } + } + + /// <summary> + /// True, if this is a redelivered message + /// </summary> + public bool Redelivered + { + get { return _redelivered; } + set { _redelivered = value; } + } + + /// <summary> + /// The message type name + /// </summary> + public string Type + { + get { return ContentHeaderProperties.Type; } + set { ContentHeaderProperties.Type = value; } + } + + /// <summary> + /// Message expiration specification + /// </summary> + public long Expiration + { + get { return ContentHeaderProperties.Expiration; } + set { ContentHeaderProperties.Expiration = value; } + } + + /// <summary> + /// The message priority, 0 to 9 + /// </summary> + public byte Priority + { + get { return ContentHeaderProperties.Priority; } + set { ContentHeaderProperties.Priority = (byte) value; } + } + + /// <summary> + /// The MIME Content Type + /// </summary> + public string ContentType + { + get { return ContentHeaderProperties.ContentType; } + set { ContentHeaderProperties.ContentType = value; } + } + + /// <summary> + /// The MIME Content Encoding + /// </summary> + public string ContentEncoding + { + get { return ContentHeaderProperties.Encoding; } + set { ContentHeaderProperties.Encoding = value; } + } + + /// <summary> + /// Headers of this message + /// </summary> + public IHeaders Headers + { + get { return _headers; } + } + + /// <summary> + /// The creating user id + /// </summary> + public string UserId + { + get { return ContentHeaderProperties.UserId; } + set { ContentHeaderProperties.UserId = value; } + } + + /// <summary> + /// The creating application id + /// </summary> + public string AppId + { + get { return ContentHeaderProperties.AppId; } + set { ContentHeaderProperties.AppId = value; } + } + + /// <summary> + /// Intra-cluster routing identifier + /// </summary> + public string ClusterId + { + get { return ContentHeaderProperties.ClusterId; } + set { ContentHeaderProperties.ClusterId = value; } + } + + /// <summary> + /// Return the raw byte array that is used to populate the frame when sending + /// the message. + /// </summary> + /// <value>a byte array of message data</value> + public ByteBuffer Data + { + get + { + if (_data != null) + { + if (!_readableMessage) + { + _data.Flip(); + } + else + { + // Make sure we rewind the data just in case any method has moved the + // position beyond the start. + _data.Rewind(); + } + } + return _data; + } + + set + { + _data = value; + } + } + + public void Acknowledge() + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_channel != null) + { + // we set multiple to true here since acknowledgement implies acknowledge of all count messages + // received on the session + _channel.AcknowledgeMessage((ulong)DeliveryTag, true); + } + + } + + public abstract void ClearBodyImpl(); + + public void ClearBody() + { + ClearBodyImpl(); + _readableMessage = false; + } + + /// <summary> + /// Get a String representation of the body of the message. Used in the + /// toString() method which outputs this before message properties. + /// </summary> + /// <exception cref="QpidException"></exception> + public abstract string ToBodyString(); + + public override string ToString() + { + try + { + StringBuilder buf = new StringBuilder("Body:\n"); + buf.Append(ToBodyString()); + buf.Append("\nQmsTimestamp: ").Append(Timestamp); + buf.Append("\nQmsExpiration: ").Append(Expiration); + buf.Append("\nQmsPriority: ").Append(Priority); + buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); + buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); + buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); + buf.Append("\nAMQ message number: ").Append(DeliveryTag); + buf.Append("\nProperties:"); + if (ContentHeaderProperties.Headers == null) + { + buf.Append("<NONE>"); + } + else + { + buf.Append(Headers.ToString()); + } + return buf.ToString(); + } + catch (Exception e) + { + return e.ToString(); + } + } + + public FieldTable PopulateHeadersFromMessageProperties() + { + if (ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + FieldTable table = new FieldTable(); + foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) + { + string propertyName = (string) entry.Key; + if (propertyName == null) + { + continue; + } + else + { + table[propertyName] = entry.Value.ToString(); + } + } + return table; + } + } + + public BasicContentHeaderProperties ContentHeaderProperties + { + get + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + } + + protected virtual void Reset() + { + _readableMessage = true; + } + + public bool IsReadable + { + get { return _readableMessage; } + } + + public bool isWritable + { + get { return !_readableMessage; } + } + + protected void CheckReadable() + { + if ( !_readableMessage ) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + /// <summary> + /// Decodes the replyto field if one is set. + /// + /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and + /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is + /// empty the replyto field is expected to being with ':'. + /// + /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception. + /// </summary> + /// + /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns> + private BindingURL ReadReplyToHeader() + { + string replyToEncoding = ContentHeaderProperties.ReplyTo; + //log.Debug("replyToEncoding = " + replyToEncoding); + + BindingURL bindingUrl = new BindingURL(replyToEncoding); + //log.Debug("bindingUrl = " + bindingUrl.ToString()); + + return bindingUrl; + + //log.Info("replyToEncoding = " + replyToEncoding); + +// if ( replyToEncoding == null ) +// { +// return new Dest(); +// } else +// { +// // Split the replyto field on a ':' +// string[] split = replyToEncoding.Split(':'); + +// // Ensure that the replyto field argument only consisted of two parts. +// if ( split.Length != 2 ) +// { +// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); +// } + +// // Extract the exchange name and routing key from the split replyto field. +// string exchangeName = split[0]; + +// string[] split2 = split[1].Split('/'); +// string routingKey = split2[3]; + +// return new Dest(exchangeName, routingKey); +// } + } + + private void WriteReplyToHeader(BindingURL dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + } + + public class BindingURL + { + public readonly static string OPTION_EXCLUSIVE = "exclusive"; + public readonly static string OPTION_AUTODELETE = "autodelete"; + public readonly static string OPTION_DURABLE = "durable"; + public readonly static string OPTION_CLIENTID = "clientid"; + public readonly static string OPTION_SUBSCRIPTION = "subscription"; + public readonly static string OPTION_ROUTING_KEY = "routingkey"; + + /// <summary> Holds the undecoded URL </summary> + string url; + + /// <summary> Holds the decoded options. </summary> + IDictionary options = new Hashtable(); + + /// <summary> Holds the decoded exchange class. </summary> + string exchangeClass; + + /// <summary> Holds the decoded exchange name. </summary> + string exchangeName; + + /// <summary> Holds the destination name. </summary> + string destination; + + /// <summary> Holds the decoded queue name. </summary> + string queueName; + + /// <summary> + /// The binding URL has the format: + /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + /// </summary> + public BindingURL(string url) + { + this.url = url; + Parse(); + } + + public string Url { get { return url; } } + + public string ExchangeClass + { + get { return exchangeClass; } + set { exchangeClass = value; } + } + + public string ExchangeName + { + get { return exchangeName; } + set { exchangeName = value; } + } + + public string QueueName + { + get { return queueName; } + set { queueName = value; } + } + + public string DestinationName + { + get { return destination; } + set { destination = value; } + } + + public string RoutingKey { + get { return (string)options[OPTION_ROUTING_KEY]; } + set { options[OPTION_ROUTING_KEY] = value; } + } + + public bool ContainsOption(string key) { return options.Contains(key); } + + public string ToString() + { + return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + + ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] "; + } + + private void Parse() + { + Uri binding = new Uri(url); + + // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified. + string exchangeClass = binding.Scheme; + + if (exchangeClass == null) + { + url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url; + Parse(); + + return; + } + else + { + this.exchangeClass = exchangeClass; + } + + // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified. + string exchangeName = binding.Host; + + if (exchangeName == null) + { + if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS)) + { + this.exchangeName = ""; + } + } + else + { + this.exchangeName = exchangeName; + } + + // Extract the destination and queue name. + if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals("")) + { + throw new UriFormatException("Destination or Queue required"); + } + else + { + int slashOffset = binding.AbsolutePath.IndexOf("/", 1); + if (slashOffset == -1) + { + throw new UriFormatException("Destination required"); + } + else + { + String path = binding.AbsolutePath; + + this.destination = path.Substring(1, slashOffset - 1); + this.queueName = path.Substring(slashOffset + 1); + } + } + + ParseOptions(options, binding.Query); + + // If the routing key is not set as an option, set it to the destination name. + if (!ContainsOption(OPTION_ROUTING_KEY)) + { + options[OPTION_ROUTING_KEY] = destination; + } + } + + /// <summary> + /// options looks like this + /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value' + /// </summary> + public static void ParseOptions(IDictionary optionMap, string options) + { + // Check that there really are some options to parse. + if ((options == null) || (options.IndexOf('=') == -1)) + { + return; + } + + int optionIndex = options.IndexOf('='); + string option = options.Substring(0, optionIndex); + int length = options.Length; + int nestedQuotes = 0; + + // Holds the index of the final "'". + int valueIndex = optionIndex; + + // Loop over all the options.Dest + while ((nestedQuotes > 0) || (valueIndex < length)) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options[valueIndex] == '\'') + { + if ((valueIndex + 1) < options.Length) + { + if ((options[valueIndex + 1] == '&') || + (options[valueIndex + 1] == ',') || + (options[valueIndex + 1] == ';') || + (options[valueIndex + 1] == '\'')) + { + nestedQuotes--; + + if (nestedQuotes == 0) + { + // We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options[valueIndex] == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs new file mode 100644 index 0000000000..bed379290f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public interface IMessageFactory + { + /// <summary> + /// Create a message + /// </summary> + /// <param name="deliverTag">Delivery Tag</param> + /// <param name="messageNbr">Message Sequence Number</param> + /// <param name="redelivered">True if this is a redelivered message</param> + /// <param name="contentHeader">Content headers</param> + /// <param name="bodies">Message bodies</param> + /// <returns>The new message</returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies); + + /// <summary> + /// Creates the message. + /// </summary> + /// <param name="mimeType">Mime type to associate the new message with</param> + /// <returns>The new message</returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(string mimeType); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs new file mode 100644 index 0000000000..fdb5e14aa6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client.Message +{ + public class MessageFactoryRegistry + { + private readonly Hashtable _mimeToFactoryMap = new Hashtable(); + private IMessageFactory _defaultFactory; + + /// <summary> + /// Default factory to use for unknown message types + /// </summary> + public IMessageFactory DefaultFactory + { + get { return _defaultFactory; } + set { _defaultFactory = value; } + } + + /// <summary> + /// Register a new message factory for a MIME type + /// </summary> + /// <param name="mimeType">Mime type to register</param> + /// <param name="mf"></param> + public void RegisterFactory(string mimeType, IMessageFactory mf) + { + if ( mf == null ) + throw new ArgumentNullException("mf"); + if ( mimeType == null || mimeType.Length == 0 ) + throw new ArgumentNullException("mimeType"); + + _mimeToFactoryMap[mimeType] = mf; + } + + /// <summary> + /// Remove a message factory + /// </summary> + /// <param name="mimeType">MIME type to unregister</param> + public void DeregisterFactory(string mimeType) + { + _mimeToFactoryMap.Remove(mimeType); + } + + /// <summary> + /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate + /// concrete message type. + /// </summary> + /// <param name="messageNbr">the AMQ message id</param> + /// <param name="redelivered">true if redelivered</param> + /// <param name="contentHeader">the content header that was received</param> + /// <param name="bodies">a list of ContentBody instances</param> + /// <returns>the message.</returns> + /// <exception cref="AMQException"/> + /// <exception cref="QpidException"/> + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties; + + if ( properties.ContentType == null ) + { + properties.ContentType = ""; + } + + IMessageFactory mf = GetFactory(properties.ContentType); + return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies); + } + + /// <summary> + /// Create a new message of the specified type + /// </summary> + /// <param name="mimeType">The Mime type</param> + /// <returns>The new message</returns> + public AbstractQmsMessage CreateMessage(string mimeType) + { + if ( mimeType == null || mimeType.Length == 0 ) + throw new ArgumentNullException("mimeType"); + + IMessageFactory mf = GetFactory(mimeType); + return mf.CreateMessage(mimeType); + } + + /// <summary> + /// Construct a new registry with the default message factories registered + /// </summary> + /// <returns>a message factory registry</returns> + public static MessageFactoryRegistry NewDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.RegisterFactory("text/plain", new QpidTextMessageFactory()); + mf.RegisterFactory("text/xml", new QpidTextMessageFactory()); + mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory()); + + mf.DefaultFactory = new QpidBytesMessageFactory(); + return mf; + } + + private IMessageFactory GetFactory(string mimeType) + { + IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType]; + return mf != null ? mf : _defaultFactory; + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs new file mode 100644 index 0000000000..fb3efb1b0f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -0,0 +1,353 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + [Serializable] + class MessageEOFException : QpidException + { + public MessageEOFException(string message) : base(message) + { + } + + protected MessageEOFException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } + + public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage + { + private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + public QpidBytesMessage() : this(null) + { + } + + /// <summary> + /// Construct a bytes message with existing data. + /// </summary> + /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in + /// write-only mode</param> + QpidBytesMessage(ByteBuffer data) : base(data) + { + // superclass constructor has instantiated a content header at this point + if (data == null) + { + _data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.IsAutoExpand = true; + } + } + + internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data) + { + } + + public override void ClearBodyImpl() + { + _data.Clear(); + } + + public override string ToBodyString() + { + CheckReadable(); + try + { + return GetText(); + } + catch (IOException e) + { + throw new QpidException(e.ToString()); + } + } + + private String GetText() + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + int pos = _data.Position; + _data.Rewind(); + // one byte left is for the end of frame marker + if (_data.Remaining == 0) + { + // this is really redundant since pos must be zero + _data.Position = pos; + return null; + } + else + { + byte[] data = new byte[_data.Remaining]; + _data.GetBytes(data); + return Encoding.UTF8.GetString(data); + } + } + + public long BodyLength + { + get + { + CheckReadable(); + return _data.Limit; + } + } + + private void CheckWritable() + { + if (_readableMessage) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + public bool ReadBoolean() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetByte() != 0; + } + + public byte ReadByte() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetByte(); + } + + public short ReadSignedByte() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetSByte(); + } + + public short ReadShort() + { + CheckReadable(); + CheckAvailable(2); + return _data.GetInt16(); + } + + public char ReadChar() + { + CheckReadable(); + CheckAvailable(2); + return _data.GetChar(); + } + + public int ReadInt() + { + CheckReadable(); + CheckAvailable(4); + return _data.GetInt32(); + } + + public long ReadLong() + { + CheckReadable(); + CheckAvailable(8); + return _data.GetInt64(); + } + + public float ReadFloat() + { + CheckReadable(); + CheckAvailable(4); + return _data.GetFloat(); + } + + public double ReadDouble() + { + CheckReadable(); + CheckAvailable(8); + return _data.GetDouble(); + } + + public string ReadUTF() + { + CheckReadable(); + // we check only for one byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + CheckAvailable(1); + try + { + byte[] data = new byte[_data.Remaining]; + _data.GetBytes(data); + return Encoding.UTF8.GetString(data); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + CheckReadable(); + int count = (_data.Remaining >= bytes.Length ? bytes.Length : _data.Remaining); + if (count == 0) + { + return -1; + } + else + { + _data.GetBytes(bytes, 0, count); + return count; + } + } + + public int ReadBytes(byte[] bytes, int maxLength) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + if (maxLength > bytes.Length) + { + throw new ArgumentOutOfRangeException("maxLength must be >= 0"); + } + CheckReadable(); + int count = (_data.Remaining >= maxLength ? maxLength : _data.Remaining); + if (count == 0) + { + return -1; + } + else + { + _data.GetBytes(bytes, 0, count); + return count; + } + } + + public void WriteBoolean(bool b) + { + CheckWritable(); + _data.Put(b ? (byte)1 : (byte)0); + } + + public void WriteByte(byte b) + { + CheckWritable(); + _data.Put(b); + } + + public void WriteShort(short i) + { + CheckWritable(); + _data.Put(i); + } + + public void WriteChar(char c) + { + CheckWritable(); + _data.Put(c); + } + + public void WriteSignedByte(short value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteDouble(double value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteFloat(float value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteInt(int value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteLong(long value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteUTF(string value) + { + CheckWritable(); + byte[] encodedData = Encoding.UTF8.GetBytes(value); + _data.Put(encodedData); + } + + public void WriteBytes(byte[] bytes) + { + CheckWritable(); + _data.Put(bytes); + } + + public void WriteBytes(byte[] bytes, int offset, int length) + { + CheckWritable(); + _data.Put(bytes, offset, length); + } + + protected override void Reset() + { + base.Reset(); + _data.Flip(); + } + + void IBytesMessage.Reset() + { + Reset(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws MessageEOFException if there are less than len bytes available to read + */ + private void CheckAvailable(int len) + { + if (_data.Remaining < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs new file mode 100644 index 0000000000..3cc96cbddc --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public class QpidBytesMessageFactory : AbstractQmsMessageFactory + { + //protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, + // ContentHeaderBody contentHeader, + // IList bodies) + //{ + // byte[] data; + + // // we optimise the non-fragmented case to avoid copying + // if (bodies != null && bodies.Count == 1) + // { + // data = ((ContentBody)bodies[0]).Payload; + // } + // else + // { + // data = new byte[(long)contentHeader.BodySize]; + // int currentPosition = 0; + // foreach (ContentBody cb in bodies) + // { + // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + // currentPosition += cb.Payload.Length; + // } + // } + + // return new QpidBytesMessage(messageNbr, data, contentHeader); + //} + + //public override AbstractQmsMessage CreateMessage() + //{ + // return new QpidBytesMessage(); + //} + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) + { + return new QpidBytesMessage(deliveryTag, contentHeader, data); + } + + public override AbstractQmsMessage CreateMessage(string mimeType) + { + QpidBytesMessage msg = new QpidBytesMessage(); + msg.ContentType = mimeType; + return msg; + } + + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs new file mode 100644 index 0000000000..d27c1df853 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs @@ -0,0 +1,213 @@ +using System; +using System.Collections; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client.Message +{ + internal class QpidHeaders : IHeaders + { + private FieldTable _headers; + + public QpidHeaders(FieldTable headers) + { + if ( headers == null ) + throw new ArgumentNullException("headers"); + _headers = headers; + } + + public bool Contains(string name) + { + CheckPropertyName(name); + return _headers.Contains(name); + } + + public void Clear() + { + _headers.Clear(); + } + + public object this[string name] + { + get { return GetObject(name); } + set { SetObject(name, value); } + } + + public bool GetBoolean(string name) + { + CheckPropertyName(name); + if ( Contains(name) ) + return _headers.GetBoolean(name); + return false; + } + + public void SetBoolean(string name, bool b) + { + CheckPropertyName(name); + _headers.SetBoolean(name, b); + } + + public byte GetByte(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetByte(propertyName); + return 0; + } + + public void SetByte(string propertyName, byte b) + { + CheckPropertyName(propertyName); + _headers.SetByte(propertyName, b); + } + + // we have sbyte overloads to interoperate with java + // because the Java client/server uses signed bytes + // by default, while C#'s is unsigned + public sbyte GetSByte(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetSByte(propertyName); + return 0; + } + + public void SetSByte(string propertyName, sbyte b) + { + CheckPropertyName(propertyName); + _headers.SetSByte(propertyName, b); + } + + public short GetShort(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt16(propertyName); + return 0; + } + + public void SetShort(string propertyName, short i) + { + CheckPropertyName(propertyName); + _headers.SetInt16(propertyName, i); + } + + public int GetInt(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt32(propertyName); + return 0; + } + + public void SetInt(string propertyName, int i) + { + CheckPropertyName(propertyName); + _headers.SetInt32(propertyName, i); + } + + public long GetLong(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt64(propertyName); + return 0; + } + + public void SetLong(string propertyName, long l) + { + CheckPropertyName(propertyName); + _headers.SetInt64(propertyName, l); + } + + public float GetFloat(String propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetFloat(propertyName); + return 0f; + } + + public void SetFloat(string propertyName, float f) + { + CheckPropertyName(propertyName); + _headers.SetFloat(propertyName, f); + } + + public double GetDouble(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetDouble(propertyName); + return 0; + } + + public void SetDouble(string propertyName, double v) + { + CheckPropertyName(propertyName); + _headers.SetDouble(propertyName, v); + } + + public string GetString(string propertyName) + { + CheckPropertyName(propertyName); + return _headers.GetString(propertyName); + } + + public void SetString(string propertyName, string value) + { + CheckPropertyName(propertyName); + _headers.SetString(propertyName, value); + } + + public object GetObject(string propertyName) + { + CheckPropertyName(propertyName); + return _headers[propertyName]; + } + + public void SetObject(string propertyName, object value) + { + CheckPropertyName(propertyName); + _headers[propertyName] = value; + } + + private static void CheckPropertyName(string propertyName) + { + if ( propertyName == null ) + { + throw new ArgumentException("Property name must not be null"); + } else if ( "".Equals(propertyName) ) + { + throw new ArgumentException("Property name must not be the empty string"); + } + } + + public override string ToString() + { + StringBuilder buf = new StringBuilder("{"); + int i = 0; + foreach ( DictionaryEntry entry in _headers ) + { + ++i; + if ( i > 1 ) + { + buf.Append(", "); + } + string propertyName = (string)entry.Key; + if ( propertyName == null ) + { + buf.Append("\nInternal error: Property with NULL key defined"); + } else + { + buf.Append(propertyName); + buf.Append(" = ").Append(entry.Value); + } + } + buf.Append("}"); + return buf.ToString(); + } + + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs new file mode 100644 index 0000000000..24aef92aa5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public class QpidTextMessage : AbstractQmsMessage, ITextMessage + { + private string _decodedValue = null; + private static Encoding DEFAULT_ENCODING = Encoding.UTF8; + + internal QpidTextMessage() : this(null, null) + { + ContentEncoding = DEFAULT_ENCODING.BodyName; + } + + internal QpidTextMessage(ByteBuffer data, String encoding) : base(data) + { + ContentEncoding = encoding; + } + + internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + :base(deliveryTag, contentHeader, data) + { + } + + public override void ClearBodyImpl() + { + if (_data != null) + { + _data.Release(); + } + _data = null; + _decodedValue = null; + } + + public override string ToBodyString() + { + return Text; + } + + public string Text + { + get + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + _data.Rewind(); + + // Read remaining bytes. + byte[] bytes = new byte[_data.Remaining]; + _data.GetBytes(bytes); + + // Convert to string based on encoding. + if (ContentHeaderProperties.Encoding != null) + { + // throw ArgumentException if the encoding is not supported + _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(bytes); + } + else + { + _decodedValue = DEFAULT_ENCODING.GetString(bytes); + } + return _decodedValue; + } + } + + set + { + byte[] bytes; + if (ContentHeaderProperties.Encoding == null) + { + bytes = Encoding.Default.GetBytes(value); + } + else + { + // throw ArgumentException if the encoding is not supported + bytes = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); + } + _data = ByteBuffer.Wrap(bytes); + _decodedValue = value; + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs new file mode 100644 index 0000000000..79871e85ca --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class QpidTextMessageFactory : AbstractQmsMessageFactory + { + public override AbstractQmsMessage CreateMessage(string mimeType) + { + QpidTextMessage msg = new QpidTextMessage(); + msg.ContentType = mimeType; + return msg; + } + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) + { + return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs new file mode 100644 index 0000000000..4317ef3474 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using log4net; + +namespace Apache.Qpid.Client.Message +{ + /// <summary> + /// Raised when a message body is received unexpectedly by the client. This typically occurs when the + /// length of bodies received does not match with the declared length in the content header. + /// </summary> + [Serializable] + public class UnexpectedBodyReceivedException : AMQException + { + public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t) + : base(logger, msg, t) + { + } + + public UnexpectedBodyReceivedException(ILog logger, string msg) + : base(logger, msg) + { + } + + public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg) + : base(logger, errorCode, msg) + { + } + + protected UnexpectedBodyReceivedException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs new file mode 100644 index 0000000000..d329712334 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class UnprocessedMessage + { + private ulong _bytesReceived = 0; + + public BasicDeliverBody DeliverBody; + public BasicReturnBody BounceBody; + public ushort ChannelId; + public ContentHeaderBody ContentHeader; + + /// <summary> + /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general + /// </summary> + /// TODO: write and use linked list class + public IList Bodies = new ArrayList(); + + public void ReceiveBody(ContentBody body) + { + Bodies.Add(body); + if (body.Payload != null) + { + _bytesReceived += (uint)body.Payload.Remaining; + } + } + + public bool IsAllBodyDataReceived() + { + return _bytesReceived == ContentHeader.BodySize; + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs new file mode 100644 index 0000000000..a7ce808862 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + public class AMQMethodEvent + { + private AMQMethodBody _method; + + private ushort _channelId; + + private AMQProtocolSession _protocolSession; + + public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession) + { + _channelId = channelId; + _method = method; + _protocolSession = protocolSession; + } + + public AMQMethodBody Method + { + get + { + return _method; + } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQProtocolSession ProtocolSession + { + get + { + return _protocolSession; + } + } + + public override String ToString() + { + StringBuilder buf = new StringBuilder("Method event: "); + buf.Append("\nChannel id: ").Append(_channelId); + buf.Append("\nMethod: ").Append(_method); + return buf.ToString(); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs new file mode 100644 index 0000000000..c51538b70e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -0,0 +1,318 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// AMQProtocolListener + /// + /// <p/>Fail-over state transition rules... + /// + /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order + /// to be able to send errors during failover back to the client application. The session won't be available in the case + /// when failing over due to a Connection.Redirect message from the broker. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Track fail over state of a connection. + /// <tr><td> Manage method listeners. <td> IAMQMethodListener + /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler + /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener + /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener + /// </table> + /// + /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being + /// triggered, when it should not be. + /// + /// </summary> + public class AMQProtocolListener : IProtocolListener + { + /// <summary>Used for debugging.</summary> + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); + + /// <summary> + /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, + /// the failover process is handed off to this handler. + /// </summary> + private FailoverHandler _failoverHandler; + + /// <summary>Tracks the current fail-over state.</summary> + internal FailoverState _failoverState = FailoverState.NOT_STARTED; + + internal FailoverState FailoverState + { + get { return _failoverState; } + set { _failoverState = value; } + } + + internal ManualResetEvent FailoverLatch; + + AMQConnection _connection; + AMQStateManager _stateManager; + + public AMQStateManager StateManager + { + get { return _stateManager; } + set { _stateManager = value; } + } + + private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); + + AMQProtocolSession _protocolSession = null; + + private readonly Object _lock = new Object(); + + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } + + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _failoverHandler = new FailoverHandler(connection); + } + + public void OnMessage(IDataBlock message) + { + // Handle incorrect protocol version. + if (message is ProtocolInitiation) + { + string error = String.Format("Protocol mismatch - {0}", message.ToString()); + AMQException e = new AMQProtocolHeaderException(error); + _log.Error("Closing connection because of protocol mismatch", e); + //_protocolSession.CloseProtocolSession(); + _stateManager.Error(e); + return; + } + + AMQFrame frame = (AMQFrame)message; + + if (frame.BodyFrame is AMQMethodBody) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Method frame received: " + frame); + } + AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); + try + { + bool wasAnyoneInterested = false; + lock (_frameListeners.SyncRoot) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (Exception e) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + } + else if (frame.BodyFrame is ContentHeaderBody) + { + _protocolSession.MessageContentHeaderReceived(frame.Channel, + (ContentHeaderBody)frame.BodyFrame); + } + else if (frame.BodyFrame is ContentBody) + { + _protocolSession.MessageContentBodyReceived(frame.Channel, + (ContentBody)frame.BodyFrame); + } + else if (frame.BodyFrame is HeartbeatBody) + { + _log.Debug("HeartBeat received"); + } + } + + /// <summary> + /// Receives notification of any IO exceptions on the connection. + /// + /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then + /// all method listeners and the application connection object are notified of the connection failure exception. + /// + /// <p/>All other exception types are propagated to all method listeners. + /// </summary> + public void OnException(Exception cause) + { + _log.Warn("public void OnException(Exception cause = " + cause + "): called"); + + // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also + // ensures that this exception is fully propagated to all listeners, before another one can be processed. + lock (_lock) + { + if (cause is AMQConnectionClosedException || cause is System.IO.IOException) + { + // Try a fail-over because the connection has failed. + FailoverState failoverState = AttemptFailover(); + + // Check if the fail-over has failed, in which case notify all method listeners of the exception. + // The application connection object is also notified of the failure of the connection with the exception. + if (failoverState == FailoverState.FAILED) + { + _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); + } + } + // Notify all method listeners of the exception. + else + { + PropagateExceptionToWaiters(cause); + _connection.ExceptionReceived(cause); + } + } + } + + /// <summary> + /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been + /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress + /// this method allows it to continue to run and will do nothing. + /// + /// <p/>This method should only be called when the connection has been remotely closed. + /// </summary> + /// + /// <returns>The fail-over state at the end of this attempt.</returns> + private FailoverState AttemptFailover() + { + _log.Debug("private void AttemptFailover(): called"); + _log.Debug("_failoverState = " + _failoverState); + + // Ensure that the connection stops sending heart beats, if it still is. + _connection.StopHeartBeatThread(); + + // Check that the connection policy allows fail-over to be attempted. + if (!_connection.IsFailoverAllowed) + { + _log.Debug("Connection does not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); + } + + // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. + if (_connection.Closed) + { + return _failoverState; + } + + // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is + // advanced to 'in progress' + if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) + { + _log.Info("Starting the fail-over process."); + + _failoverState = FailoverState.IN_PROGRESS; + StartFailoverThread(); + } + + return _failoverState; + } + + /// <summary> + /// There are two cases where we have other threads potentially blocking for events to be handled by this + /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a + /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can + /// react appropriately. + /// + /// <param name="e">the exception to propagate</param> + /// </summary> + public void PropagateExceptionToWaiters(Exception e) + { + // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. + _stateManager.Error(e); + lock ( _lock ) + { + foreach ( IAMQMethodListener listener in _frameListeners ) + { + listener.Error(e); + } + } + } + + public void AddFrameListener(IAMQMethodListener listener) + { + lock ( _lock ) + { + _frameListeners.Add(listener); + } + } + + public void RemoveFrameListener(IAMQMethodListener listener) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Removing frame listener: " + listener.ToString()); + } + lock ( _lock ) + { + _frameListeners.Remove(listener); + } + } + + public void BlockUntilNotFailingOver() + { + if (FailoverLatch != null) + { + FailoverLatch.WaitOne(); + } + } + + /// <summary> + /// "Failover" for redirection. + /// </summary> + /// <param name="host"></param> + /// <param name="port"></param> + public void Failover(string host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + StartFailoverThread(); + } + + private void StartFailoverThread() + { + Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); + failoverThread.Name = "Failover"; + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.IsBackground = false; + failoverThread.Start(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs new file mode 100644 index 0000000000..7ae086e35f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -0,0 +1,279 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Framing; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Protocol +{ + public class AMQProtocolSession + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); + + private readonly IProtocolWriter _protocolWriter; + private readonly IConnectionCloser _connectionCloser; + + /** + * Counter to ensure unique queue names + */ + private int _queueId = 1; + private readonly Object _queueIdLock = new Object(); + + /// <summary> + /// Maps from the channel id to the AmqChannel that it represents. + /// </summary> + //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); + + //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from a channel id to an unprocessed message. This is used to tie together the + /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + /// </summary> + //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); + + private AMQConnection _connection; + + public string ClientID { get { return _connection.ClientID; } } + + public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) + { + _protocolWriter = protocolWriter; + _connectionCloser = connectionCloser; + _connection = connection; + } + + public void Init() + { + // start the process of setting up the connection. This is the first place that + // data is written to the server. + _protocolWriter.Write(new ProtocolInitiation()); + } + + public string Username + { + get + { + return AMQConnection.Username; + } + } + + public string Password + { + get + { + return AMQConnection.Password; + } + } + + ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. + + public ConnectionTuneParameters ConnectionTuneParameters + { + get + { + return _connectionTuneParameters; + } + set + { + _connectionTuneParameters = value; + AMQConnection con = AMQConnection; + con.SetMaximumChannelCount(value.ChannelMax); + con.MaximumFrameSize = value.FrameMax; + } + } + + private ISaslClient _saslClient; + public ISaslClient SaslClient + { + get { return _saslClient; } + set { _saslClient = value; } + } + + /// <summary> + /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. + /// This is invoked on the MINA dispatcher thread. + /// </summary> + /// <param name="message">the unprocessed message</param> + /// <exception cname="AMQException">if this was not expected</exception> + public void UnprocessedMessageReceived(UnprocessedMessage message) + { + _channelId2UnprocessedMsgMap[message.ChannelId] = message; + } + + public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); + } + if (msg.ContentHeader != null) + { + throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); + } + msg.ContentHeader = contentHeader; + if (contentHeader.BodySize == 0) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); + } + if (msg.ContentHeader == null) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw new AMQException("Error: received content body without having received a ContentHeader frame first"); + } + try + { + msg.ReceiveBody(contentBody); + } + catch (UnexpectedBodyReceivedException e) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw e; + } + if (msg.IsAllBodyDataReceived()) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + /// <summary> + /// Deliver a message to the appropriate session, removing the unprocessed message + /// from our map + /// <param name="channelId">the channel id the message should be delivered to</param> + /// <param name="msg"> the message</param> + private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) + { + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.MessageReceived(msg); + _channelId2UnprocessedMsgMap.Remove(channelId); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session. Equivalent + /// to calling getProtocolSession().write(). + /// </summary> + /// <param name="frame">the frame to write</param> + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + public void AddSessionByChannel(ushort channelId, AmqChannel channel) + { + if (channel == null) + { + throw new ArgumentNullException("Attempt to register a null channel"); + } + _logger.Debug("Add channel with channel id " + channelId); + _channelId2SessionMap[channelId] = channel; + } + + public void RemoveSessionByChannel(ushort channelId) + { + _logger.Debug("Removing session with channelId " + channelId); + _channelId2SessionMap.Remove(channelId); + } + + /// <summary> + /// Starts the process of closing a channel + /// </summary> + /// <param name="channel" the AmqChannel being closed</param> + public void CloseSession(AmqChannel channel) + { + _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); + ushort channelId = channel.ChannelId; + + // we need to know when a channel is closing so that we can respond + // with a channel.close frame when we receive any other type of frame + // on that channel + _closingChannels[channelId] = channel; + + } + + /// <summary> + /// Called from the ChannelClose handler when a channel close frame is received. + /// This method decides whether this is a response or an initiation. The latter + /// case causes the AmqChannel to be closed and an exception to be thrown if + /// appropriate. + /// </summary> + /// <param name="channelId">the id of the channel (session)</param> + /// <returns>true if the client must respond to the server, i.e. if the server + /// initiated the channel close, false if the channel close is just the server + /// responding to the client's earlier request to close the channel.</returns> + public bool ChannelClosed(ushort channelId, int code, string text) + { + // if this is not a response to an earlier request to close the channel + if (!_closingChannels.ContainsKey(channelId)) + { + _closingChannels.Remove(channelId); + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.ClosedWithException(new AMQException(_logger, code, text)); + return true; + } + else + { + return false; + } + } + + public AMQConnection AMQConnection + { + get + { + return _connection; + } + } + + public void CloseProtocolSession() + { + _logger.Debug("Closing protocol session"); + _connectionCloser.Close(); + } + + internal string GenerateQueueName() + { + int id; + lock(_queueIdLock) + { + id = _queueId++; + } + + return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs new file mode 100644 index 0000000000..6841b46f54 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs @@ -0,0 +1,47 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Text;
+
+namespace Apache.Qpid.Client.Protocol
+{
+ /// <summary>
+ /// Default timeout values for the protocol
+ /// </summary>
+ sealed class DefaultTimeouts
+ {
+ /// <summary>
+ /// Maximum number of milliseconds to wait for a state change
+ /// in the protocol's state machine
+ /// </summary>
+ public const int MaxWaitForState = 30* 1000;
+ /// <summary>
+ /// Maximum number of milliseconds to wait for a reply
+ /// frame when doing synchronous writer to the broker
+ /// </summary>
+ public const int MaxWaitForSyncWriter = 30 * 1000;
+
+ private DefaultTimeouts()
+ {
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs new file mode 100644 index 0000000000..e3298200c4 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.Protocol +{ + public interface IConnectionCloser + { + void Close(); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs new file mode 100644 index 0000000000..3b53f015f8 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + public interface IProtocolListener + { + void OnMessage(IDataBlock message); + void OnException(Exception e); + + // XXX: .NET way of doing listeners? + void AddFrameListener(IAMQMethodListener listener); + void RemoveFrameListener(IAMQMethodListener listener); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs new file mode 100644 index 0000000000..9cc9f8cee5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public abstract class BlockingMethodFrameListener : IAMQMethodListener + { + private ManualResetEvent _resetEvent; + + public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame); + + /// <summary> + /// This is set if there is an exception thrown from processCommandFrame and the + /// exception is rethrown to the caller of blockForFrame() + /// </summary> + private volatile Exception _error; + + protected ushort _channelId; + + protected AMQMethodEvent _doneEvt = null; + + public BlockingMethodFrameListener(ushort channelId) + { + _channelId = channelId; + _resetEvent = new ManualResetEvent(false); + } + + /// <summary> + /// This method is called by the MINA dispatching thread. Note that it could + /// be called before BlockForFrame() has been called. + /// </summary> + /// <param name="evt">the frame event</param> + /// <returns>true if the listener has dealt with this frame</returns> + /// <exception cref="AMQException"></exception> + public bool MethodReceived(AMQMethodEvent evt) + { + AMQMethodBody method = evt.Method; + + try + { + bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method); + if (ready) + { + _doneEvt = evt; + _resetEvent.Set(); + } + + return ready; + } + catch (AMQException e) + { + Error(e); + // we rethrow the error here, and the code in the frame dispatcher will go round + // each listener informing them that an exception has been thrown + throw e; + } + } + + /// <summary> + /// This method is called by the thread that wants to wait for a frame. + /// </summary> + /// <param name="timeout">Set the number of milliseconds to wait</param> + public AMQMethodEvent BlockForFrame(int timeout) + { + _resetEvent.WaitOne(timeout, true); + //at this point the event will have been signalled. The error field might or might not be set + // depending on whether an error occurred + if (_error != null) + { + throw _error; + } + + return _doneEvt; + } + + /// <summary> + /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this + /// class to avoid code repetition but again is only called by the MINA dispatcher thread. + /// </summary> + /// <param name="e">the exception that caused the error</param> + public void Error(Exception e) + { + // set the error so that the thread that is blocking (in BlockForFrame()) + // can pick up the exception and rethrow to the caller + _error = e; + _resetEvent.Set(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs new file mode 100644 index 0000000000..b5450d00f7 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public interface IAMQMethodListener + { + /// <summary> + /// Invoked when a method frame has been received + /// <param name="evt">the event</param> + /// <returns>true if the handler has processed the method frame, false otherwise. Note + /// that this does not prohibit the method event being delivered to subsequent listeners + /// but can be used to determine if nobody has dealt with an incoming method frame.</param> + /// <exception cname="AMQException">if an error has occurred. This exception will be delivered + /// to all registered listeners using the error() method (see below) allowing them to + /// perform cleanup if necessary.</exception> + bool MethodReceived(AMQMethodEvent evt); + + /// <summary> + /// Callback when an error has occurred. Allows listeners to clean up. + /// </summary> + /// <param name="e">the exception</param> + void Error(Exception e); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs new file mode 100644 index 0000000000..8cdc1dbba9 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public class SpecificMethodFrameListener : BlockingMethodFrameListener + { + private readonly Type _expectedClass; + + public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId) + { + _expectedClass = expectedClass; + } + + public override bool ProcessMethod(ushort channelId, AMQMethodBody frame) + { + return _expectedClass.IsInstanceOfType(frame); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs new file mode 100644 index 0000000000..11918f1ea2 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Framing; + +using log4net; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// A convenient interface to writing protocol frames. + /// </summary> + public class ProtocolWriter + { + + private ILog _logger = LogManager.GetLogger(typeof(ProtocolWriter)); + + IProtocolWriter _protocolWriter; + IProtocolListener _protocolListener; + + public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener) + { + _protocolWriter = protocolWriter; + _protocolListener = protocolListener; + } + + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="listener">the blocking listener. Note the calling thread will block.</param> + /// <param name="timeout">set the number of milliseconds to wait</param> + private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener, int timeout) + { + try + { + _protocolListener.AddFrameListener(listener); + _protocolWriter.Write(frame); + + return listener.BlockForFrame(timeout); + } + finally + { + _protocolListener.RemoveFrameListener(listener); + } + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="responseType">the type of method response</param> + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType) + { + // TODO: If each frame knew it's response type, then the responseType argument would + // TODO: not be neccesary. + return SyncWrite(frame, responseType, DefaultTimeouts.MaxWaitForSyncWriter); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="responseType">the type of method response</param> + /// <param name="timeout">set the number of milliseconds to wait</param> + /// <returns>set the number of milliseconds to wait</returns> + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType, int timeout) + { + return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType), timeout); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs new file mode 100644 index 0000000000..ede8966f37 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs @@ -0,0 +1,504 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Net; +using System.Text; +using System.Text.RegularExpressions; +using log4net; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client +{ + + public class URLHelper + { + public static char DEFAULT_OPTION_SEPERATOR = '&'; + public static char ALTERNATIVE_OPTION_SEPARATOR = ','; + public static char BROKER_SEPARATOR = ';'; + + /// <summary> + /// + /// </summary> + /// <param name="optionMap"></param> + /// <param name="options"></param> + public static void parseOptions(IDictionary optionMap, string options) + { + //options looks like this + //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + + if (options == null || options.IndexOf('=') == -1) + { + return; + } + + int optionIndex = options.IndexOf('='); + + String option = options.Substring(0, optionIndex); + + int length = options.Length; + + int nestedQuotes = 0; + + // to store index of final "'" + int valueIndex = optionIndex; + + //Walk remainder of url. + while (nestedQuotes > 0 || valueIndex < length) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options[valueIndex] == '\'') + { + if (valueIndex + 1 < options.Length) + { + if (options[valueIndex + 1] == DEFAULT_OPTION_SEPERATOR || + options[valueIndex + 1] == ALTERNATIVE_OPTION_SEPARATOR || + options[valueIndex + 1] == BROKER_SEPARATOR || + options[valueIndex + 1] == '\'') + { + nestedQuotes--; + // System.out.println( + // options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + if (nestedQuotes == 0) + { + //We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + // System.out.println( + // options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options[valueIndex] == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + + if (nestedQuotes != 0 || valueIndex < (optionIndex + 2)) + { + int sepIndex = 0; + + //Try and identify illegal separator character + if (nestedQuotes > 1) + { + for (int i = 0; i < nestedQuotes; i++) + { + sepIndex = options.IndexOf('\'', sepIndex); + sepIndex++; + } + } + + if (sepIndex >= options.Length || sepIndex == 0) + { + parseError(valueIndex, "Unterminated option", options); + } + else + { + parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + options[sepIndex] + "'", options); + } + } + + // optionIndex +2 to skip "='" + int sublen = valueIndex - (optionIndex + 2); + String value = options.Substring(optionIndex + 2, sublen); + + optionMap.Add(option, value); + + if (valueIndex < (options.Length - 1)) + { + //Recurse to get remaining options + parseOptions(optionMap, options.Substring(valueIndex + 2)); + } + } + + + public static void parseError(int index, String error, String url) + { + parseError(index, 1, error, url); + } + + public static void parseError(int index, int length, String error, String url) + { + throw new UrlSyntaxException(url, error, index, length); + } + + public static String printOptions(Hashtable options) + { + if (options.Count == 0) + { + return ""; + } + else + { + StringBuilder sb = new StringBuilder(); + sb.Append('?'); + foreach (String key in options.Keys) + { + sb.AppendFormat("{0}='{1}'{2}", key, options[key], DEFAULT_OPTION_SEPERATOR); + } + + sb.Remove(sb.Length - 1, 1); + return sb.ToString(); + } + } + + } + + public class QpidConnectionUrl + { + internal static IConnectionInfo FromUrl(string fullURL) + { + //_url = fullURL; + IConnectionInfo connectionInfo = new QpidConnectionInfo(); + + + // _options = new HashMap<String, String>(); + // _brokers = new LinkedList(); + // _failoverOptions = new HashMap<String, String>(); + + // Connection URL format + //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''" + // Options are of course optional except for requiring a single broker in the broker list. + try + { + Uri connection = new Uri(fullURL); + + if (connection.Scheme == null || !(connection.Scheme.Equals(ConnectionUrlConstants.AMQ_PROTOCOL))) + { + throw new UrlSyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.Host != null && connection.Host.Length > 0 && !connection.Host.Equals("default")) + { + connectionInfo.ClientName = connection.Host; + } + + String userInfo = connection.UserInfo; + if (userInfo == null || userInfo.Length == 0) + { + URLHelper.parseError(ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo, fullURL, connectionInfo); + } + String virtualHost = connection.AbsolutePath; // XXX: is AbsolutePath corrrect? + + if (virtualHost != null && virtualHost.Length > 0) + { + connectionInfo.VirtualHost = virtualHost; + } + else + { + int authLength = connection.Authority.Length; + int start = ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.Length && fullURL[testIndex] == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + QpidConnectionInfo qci = (QpidConnectionInfo)connectionInfo; + string query = connection.Query; + if (query[0] == '?') query = query.Substring(1); + URLHelper.parseOptions(qci.GetOptions(), query); + + processOptions(connectionInfo); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + return connectionInfo; + } + catch (UriFormatException uris) + { + throw uris; + // if (uris is UrlSyntaxException) + // { + // throw uris; + // } + // + // int slash = fullURL.IndexOf("\\"); + // + // if (slash == -1) + // { + // URLHelper.parseError(uris.GetIndex(), uris.getReason(), uris.getInput()); + // } + // else + // { + // if (slash != 0 && fullURL.charAt(slash - 1) == ':') + // { + // URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + // } + // else + // { + // URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + // } + // } + } + } + + private static void parseUserInfo(String userinfo, string fullUrl, IConnectionInfo connectionInfo) + { + //user info = user:pass + + int colonIndex = userinfo.IndexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3, + userinfo.Length, "Null password in user information not allowed.", fullUrl); + } + else + { + connectionInfo.Username = userinfo.Substring(0, colonIndex); + connectionInfo.Password = userinfo.Substring(colonIndex + 1); + } + } + + private static void processOptions(IConnectionInfo connectionInfo) + { + string brokerlist = connectionInfo.GetOption(ConnectionUrlConstants.OPTIONS_BROKERLIST); + if (brokerlist != null) + { + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + Regex splitter = new Regex("" + URLHelper.BROKER_SEPARATOR); + + foreach (string broker in splitter.Split(brokerlist)) + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo(broker)); + } + + connectionInfo.SetOption(ConnectionUrlConstants.OPTIONS_BROKERLIST, null); + // _options.remove(OPTIONS_BROKERLIST); + } + + string failover = connectionInfo.GetOption(ConnectionUrlConstants.OPTIONS_FAILOVER); + if (failover != null) + { + // failover='method?option='value',option='value'' + + int methodIndex = failover.IndexOf('?'); + + if (methodIndex > -1) + { + connectionInfo.FailoverMethod = failover.Substring(0, methodIndex); + QpidConnectionInfo qpidConnectionInfo = (QpidConnectionInfo)connectionInfo; + URLHelper.parseOptions(qpidConnectionInfo.GetFailoverOptions(), + failover.Substring(methodIndex + 1)); + } + else + { + connectionInfo.FailoverMethod = failover; + } + + connectionInfo.SetOption(ConnectionUrlConstants.OPTIONS_FAILOVER, null); + // _options.remove(OPTIONS_FAILOVER); + } + } + + internal static IConnectionInfo FromUri(Uri uri) + { + return null; // FIXME + + } + } + + public class QpidConnectionInfo : IConnectionInfo + { + const string DEFAULT_VHOST = "/"; + string _username = "guest"; + string _password = "guest"; + string _virtualHost = DEFAULT_VHOST; + + string _failoverMethod = null; + IDictionary _failoverOptions = new Hashtable(); + IDictionary _options = new Hashtable(); + IList _brokerInfos = new ArrayList(); // List<BrokerInfo> + string _clientName = String.Format("{0}{1:G}", Dns.GetHostName(), DateTime.Now.Ticks); + + public IDictionary GetFailoverOptions() + { + return _failoverOptions; + } + + public IDictionary GetOptions() + { + return _options; + } + + public static IConnectionInfo FromUrl(String url) + { + return QpidConnectionUrl.FromUrl(url); + } + + public string AsUrl() + { + StringBuilder sb = new StringBuilder(); + sb.AppendFormat("{0}://", ConnectionUrlConstants.AMQ_PROTOCOL); + + if (_username != null) + { + sb.Append(_username); + if (_password != null) + { + sb.AppendFormat(":{0}", _password); + } + sb.Append("@"); + } + + sb.Append(_clientName); + sb.Append(_virtualHost); + sb.Append(OptionsToString()); + + return sb.ToString(); + } + + private String OptionsToString() + { + StringBuilder sb = new StringBuilder(); + sb.AppendFormat("?{0}='", ConnectionUrlConstants.OPTIONS_BROKERLIST); + + foreach (IBrokerInfo broker in _brokerInfos) + { + sb.AppendFormat("{0};", broker); + } + + sb.Remove(sb.Length - 1, 1); + sb.Append("'"); + + if (_failoverMethod != null) + { + sb.AppendFormat("{0}{1}='{2}{3}'", URLHelper.DEFAULT_OPTION_SEPERATOR, + ConnectionUrlConstants.OPTIONS_FAILOVER, + _failoverMethod, + URLHelper.printOptions((Hashtable)_failoverOptions)); + } + + return sb.ToString(); + } + + + public string FailoverMethod + { + get { return _failoverMethod; } + set { _failoverMethod = value; } + } + + public string GetFailoverOption(string key) + { + return (string)_failoverOptions[key]; + } + + public int BrokerCount + { + get { return _brokerInfos.Count; } + } + + public IBrokerInfo GetBrokerInfo(int index) + { + return (IBrokerInfo)_brokerInfos[index]; + } + + public void AddBrokerInfo(IBrokerInfo brokerInfo) + { + if (!_brokerInfos.Contains(brokerInfo)) + { + _brokerInfos.Add(brokerInfo); + } + } + + public IList GetAllBrokerInfos() + { + return _brokerInfos; + } + + public string ClientName + { + get { return _clientName; } + set { _clientName = value; } + } + + public string Username + { + get { return _username; } + set { _username = value; } + } + + public string Password + { + get { return _password; } + set { _password = value; } + } + + public string VirtualHost + { + get { return _virtualHost; } + set { + _virtualHost = value; + if ( _virtualHost == null || _virtualHost.Length == 0 ) + _virtualHost = DEFAULT_VHOST; + if ( _virtualHost[0] != '/' ) + _virtualHost = '/' + _virtualHost; + } + } + + public string GetOption(string key) + { + return (string)_options[key]; + } + + public void SetOption(string key, string value) + { + _options[key] = value; + } + + public override string ToString() + { + return AsUrl(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs new file mode 100644 index 0000000000..85be927ff4 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs @@ -0,0 +1,127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections;
+using System.Configuration;
+using System.Text;
+using Apache.Qpid.Sasl;
+using Apache.Qpid.Sasl.Mechanisms;
+
+using Apache.Qpid.Client.Configuration;
+
+namespace Apache.Qpid.Client.Security
+{
+
+ /// <summary>
+ /// Helper class to map SASL mechanisms to our
+ /// internal ISaslCallbackHandler implementations.
+ /// </summary>
+ /// <remarks>
+ /// The set of configured callback handlers and their order
+ /// controls the selection of the SASL mechanism used for authentication.
+ /// <para>
+ /// You can either replace the default handler for CRAM-MD5 and PLAIN
+ /// authentication (the two default options) using the application
+ /// configuration file. Configuration is done by especifying the SASL
+ /// mechanism name (e.g PLAIN) and the type implementing the callback handler
+ /// used to provide any data required by the mechanism like username and password.
+ /// </para>
+ /// <para>
+ /// Callback handler types should implement the IAMQCallbackHandler interface.
+ /// </para>
+ /// <para>
+ /// New callbacks or authentication mechanisms can be configured like this:
+ /// </para>
+ /// <example><![CDATA[
+ /// <configuration>
+ /// <configSections>
+ /// <sectionGroup name="qpid.client">
+ /// <section name="authentication" type="Apache.Qpid.Client.Configuration.AuthenticationConfigurationSectionHandler, Apache.Qpid.Client"/>
+ /// </sectionGroup>
+ /// </configSections>
+ /// <qpid.client>
+ /// <authentication>
+ /// <add key="TEST" value="Apache.Qpid.Client.Tests.Security.TestCallbackHandler, Apache.Qpid.Client.Tests"/>
+ /// </authentication>
+ /// </qpid.client>
+ /// </configuration>
+ /// ]]></example>
+ /// </remarks>
+ public sealed class CallbackHandlerRegistry
+ {
+ private static CallbackHandlerRegistry _instance =
+ new CallbackHandlerRegistry();
+ private OrderedHashTable _mechanism2HandlerMap;
+ private string[] _mechanisms;
+
+ public static CallbackHandlerRegistry Instance
+ {
+ get { return _instance; }
+ }
+
+ public string[] Mechanisms
+ {
+ get { return _mechanisms; }
+ }
+
+ private CallbackHandlerRegistry()
+ {
+ _mechanism2HandlerMap = (OrderedHashTable)
+ ConfigurationSettings.GetConfig("qpid.client/authentication");
+
+ // configure default options if not available
+ if ( _mechanism2HandlerMap == null )
+ _mechanism2HandlerMap = new OrderedHashTable();
+
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+
+ _mechanisms = new string[_mechanism2HandlerMap.Count];
+ _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0);
+ }
+
+ public bool IsSupportedMechanism(string mechanism)
+ {
+ return _mechanism2HandlerMap.Contains(mechanism);
+ }
+
+ public string ChooseMechanism(string mechanisms)
+ {
+ IList mechs = mechanisms.Split(' ');
+ foreach ( string supportedMech in _mechanisms )
+ {
+ if ( mechs.Contains(supportedMech) )
+ return supportedMech;
+ }
+ return null;
+ }
+
+ public Type GetCallbackHandler(string mechanism)
+ {
+ return (Type)_mechanism2HandlerMap[mechanism];
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs b/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs new file mode 100644 index 0000000000..2560c1d96b --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs @@ -0,0 +1,35 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Sasl;
+
+namespace Apache.Qpid.Client.Security
+{
+ public interface IAMQCallbackHandler : ISaslCallbackHandler
+ {
+ void Initialize(AMQProtocolSession session);
+ }
+
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs b/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs new file mode 100644 index 0000000000..489d4d1665 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs @@ -0,0 +1,56 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections;
+using System.Text;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Sasl;
+
+namespace Apache.Qpid.Client.Security
+{
+ internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler
+ {
+ private AMQProtocolSession _session;
+
+ public void Initialize(AMQProtocolSession session)
+ {
+ if ( session == null )
+ throw new ArgumentNullException("session");
+
+ _session = session;
+ }
+
+ public void Handle(ISaslCallback[] callbacks)
+ {
+ foreach ( ISaslCallback cb in callbacks )
+ {
+ if ( cb is NameCallback )
+ {
+ ((NameCallback)cb).Text = _session.Username;
+ } else if ( cb is PasswordCallback )
+ {
+ ((PasswordCallback)cb).Text = _session.Password;
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/SslOptions.cs b/qpid/dotnet/Qpid.Client/Client/SslOptions.cs new file mode 100644 index 0000000000..d637101000 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/SslOptions.cs @@ -0,0 +1,81 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Apache.Qpid.Client
+{
+ /// <summary>
+ /// Configures SSL-related options to connect to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// If the server certificate is not trusted by the client,
+ /// connection will fail. However, you can set the
+ /// <see cref="IgnoreValidationErrors"/> property to true
+ /// to ignore any certificate verification errors for debugging purposes.
+ /// </remarks>
+ public class SslOptions
+ {
+ private X509Certificate _clientCertificate;
+ private bool _ignoreValidationErrors;
+
+ /// <summary>
+ /// Certificate to present to the broker to authenticate
+ /// this client connection
+ /// </summary>
+ public X509Certificate ClientCertificate
+ {
+ get { return _clientCertificate; }
+ }
+
+ /// <summary>
+ /// If true, the validity of the broker certificate
+ /// will not be verified on connection
+ /// </summary>
+ public bool IgnoreValidationErrors
+ {
+ get { return _ignoreValidationErrors; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance with default values
+ /// (No client certificate, don't ignore validation errors)
+ /// </summary>
+ public SslOptions()
+ {
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="clientCertificate">
+ /// Certificate to use to authenticate the client to the broker
+ /// </param>
+ /// <param name="ignoreValidationErrors">
+ /// If true, ignore any validation errors when validating the server certificate
+ /// </param>
+ public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
+ {
+ _clientCertificate = clientCertificate;
+ _ignoreValidationErrors = ignoreValidationErrors;
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs new file mode 100644 index 0000000000..67f8427fb2 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public enum AMQState + { + CONNECTION_NOT_STARTED, + CONNECTION_NOT_TUNED, + CONNECTION_NOT_OPENED, + CONNECTION_OPEN, + CONNECTION_CLOSING, + CONNECTION_CLOSED, + ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all". + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs new file mode 100644 index 0000000000..a464bbb6f5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public class AMQStateChangedEvent + { + private readonly AMQState _oldState; + + private readonly AMQState _newState; + + public AMQStateChangedEvent(AMQState oldState, AMQState newState) + { + _oldState = oldState; + _newState = newState; + } + + public AMQState OldState + { + get + { + return _oldState; + } + } + + public AMQState NewState + { + get + { + return _newState; + } + } + + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs new file mode 100644 index 0000000000..881e01e697 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Client.Handler; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.State +{ + public class AMQStateManager : IAMQMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager)); + + const bool InfoLoggingHack = true; + + /// <summary> + /// The current state + /// </summary> + private AMQState _currentState; + + /// <summary> + /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler. + /// The class must be a subclass of AMQFrame. + /// </summary> + private readonly IDictionary _state2HandlersMap; + private ArrayList _stateListeners; + private object _syncLock; + + public AMQStateManager() + { + _syncLock = new object(); + _state2HandlersMap = new Hashtable(); + _stateListeners = ArrayList.Synchronized(new ArrayList(5)); + _currentState = AMQState.CONNECTION_NOT_STARTED; + RegisterListeners(); + } + + private void RegisterListeners() + { + IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler(); + IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler(); + IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler(); + IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler(); + IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler(); + IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler(); + IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler(); + IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler(); + IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler(); + IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler(); + IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler(); + + // We need to register a map for the null (i.e. all state) handlers otherwise you get + // a stack overflow in the handler searching code when you present it with a frame for which + // no handlers are registered. + _state2HandlersMap[AMQState.ALL] = new Hashtable(); + + { + Hashtable notStarted = new Hashtable(); + notStarted[typeof(ConnectionStartBody)] = connectionStart; + notStarted[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted; + } + { + Hashtable notTuned = new Hashtable(); + notTuned[typeof(ConnectionTuneBody)] = connectionTune; + notTuned[typeof(ConnectionSecureBody)] = connectionSecure; + notTuned[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned; + } + { + Hashtable notOpened = new Hashtable(); + notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk; + notOpened[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened; + } + { + Hashtable open = new Hashtable(); + open[typeof(ChannelCloseBody)] = channelClose; + open[typeof(ConnectionCloseBody)] = connectionClose; + open[typeof(BasicDeliverBody)] = basicDeliver; + open[typeof(BasicReturnBody)] = basicReturn; + open[typeof(QueueDeleteOkBody)] = queueDeleteOk; + open[typeof(QueuePurgeOkBody)] = queuePurgeOk; + _state2HandlersMap[AMQState.CONNECTION_OPEN] = open; + } + { + Hashtable closing = new Hashtable(); + closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk; + _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing; + } + } + + public AMQState CurrentState + { + get + { + return _currentState; + } + } + + /// <summary> + /// Changes the state. + /// </summary> + /// <param name="newState">The new state.</param> + /// <exception cref="AMQException">if there is an error changing state</exception> + public void ChangeState(AMQState newState) + { + if (InfoLoggingHack) + { + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + } + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + AMQState oldState = _currentState; + _currentState = newState; + + lock ( _syncLock ) + { + foreach ( IStateListener l in _stateListeners ) + { + l.StateChanged(oldState, newState); + } + } + } + + public void Error(Exception e) + { + _logger.Debug("State manager receive error notification: " + e); + lock ( _syncLock ) + { + foreach ( IStateListener l in _stateListeners ) + { + l.Error(e); + } + } + } + + public bool MethodReceived(AMQMethodEvent evt) + { + _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType())); + IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method); + if (handler != null) + { + handler.MethodReceived(this, evt); + return true; + } + return false; + } + + /// <summary> + /// Finds the state transition handler. + /// </summary> + /// <param name="currentState">State of the current.</param> + /// <param name="frame">The frame.</param> + /// <returns></returns> + /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception> + private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState, + AMQMethodBody frame) + { + Type clazz = frame.GetType(); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Looking for state transition handler for frame " + clazz); + } + IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState]; + + if (classToHandlerMap == null) + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz]; + if (handler == null) + { + if (currentState == AMQState.ALL) + { + _logger.Debug("No state transition handler defined for receiving frame " + frame); + return null; + } + else + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + } + else + { + return handler; + } + } + + public void AddStateListener(IStateListener listener) + { + _logger.Debug("Adding state listener"); + lock ( _syncLock ) + { + _stateListeners.Add(listener); + } + } + + public void RemoveStateListener(IStateListener listener) + { + lock ( _syncLock ) + { + _stateListeners.Remove(listener); + } + } + + public void AttainState(AMQState s) + { + if (_currentState != s) + { + StateWaiter sw = null; + try + { + _logger.Debug("Adding state wait to reach state " + s); + sw = new StateWaiter(s); + AddStateListener(sw); + sw.WaituntilStateHasChanged(); + // at this point the state will have changed. + } + finally + { + RemoveStateListener(sw); + } + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs new file mode 100644 index 0000000000..31e4b5046d --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public interface IAMQStateListener + { + void StateChanged(AMQStateChangedEvent evt); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs new file mode 100644 index 0000000000..0874f39665 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.State +{ + public interface IStateAwareMethodListener + { + void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs new file mode 100644 index 0000000000..edd7382f93 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Apache.Qpid.Client.State +{ + public interface IStateListener + { + void StateChanged(AMQState oldState, AMQState newState); + + void Error(Exception e); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs new file mode 100644 index 0000000000..81de622617 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client.State +{ + [Serializable] + public class IllegalStateTransitionException : AMQException + { + private AMQState _originalState; + + private Type _frame; + + public IllegalStateTransitionException(AMQState originalState, Type frame) + : base("No valid state transition defined for receiving frame " + frame + + " from state " + originalState) + { + _originalState = originalState; + _frame = frame; + } + + protected IllegalStateTransitionException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + _originalState = (AMQState)info.GetValue("OriginalState", typeof(AMQState)); + _frame = (Type)info.GetValue("FrameType", typeof(Type)); + } + + public AMQState OriginalState + { + get + { + return _originalState; + } + } + + public Type FrameType + { + get + { + return _frame; + } + } + + public override void GetObjectData(SerializationInfo info, StreamingContext context) + { + base.GetObjectData(info, context); + info.AddValue("OriginalState", OriginalState); + info.AddValue("FrameType", FrameType); + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs new file mode 100644 index 0000000000..e739d0cb44 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Client.Protocol; +using log4net; + +namespace Apache.Qpid.Client.State +{ + public class StateWaiter : IStateListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter)); + + private readonly AMQState _state; + private AMQState _newState; + + private volatile bool _newStateAchieved; + + private volatile Exception _exception; + + private ManualResetEvent _resetEvent = new ManualResetEvent(false); + + public StateWaiter(AMQState state) + { + _state = state; + } + + public void StateChanged(AMQState oldState, AMQState newState) + { + _newState = newState; + if (_logger.IsDebugEnabled) + { + _logger.Debug("stateChanged called"); + } + if (_state == newState) + { + _newStateAchieved = true; + + if (_logger.IsDebugEnabled) + { + _logger.Debug("New state reached so notifying monitor"); + } + _resetEvent.Set(); + } + } + + public void Error(Exception e) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("exceptionThrown called"); + } + + _exception = e; + _resetEvent.Set(); + } + + public void WaituntilStateHasChanged() + { + // + // The guard is required in case we are woken up by a spurious + // notify(). + // + + TimeSpan waitTime = TimeSpan.FromMilliseconds(DefaultTimeouts.MaxWaitForState); + DateTime waitUntilTime = DateTime.Now + waitTime; + + while ( !_newStateAchieved + && _exception == null + && waitTime.TotalMilliseconds > 0 ) + { + _logger.Debug("State not achieved so waiting..."); + try + { + _resetEvent.WaitOne(waitTime, true); + } + finally + { + if (!_newStateAchieved) + { + waitTime = waitUntilTime - DateTime.Now; + } + } + } + + if (_exception != null) + { + _logger.Debug("Throwable reached state waiter: " + _exception); + if (_exception is AMQException) + throw _exception; + else + throw new AMQException("Error: " + _exception, _exception); + } + + if (!_newStateAchieved) + { + string error = string.Format("State not achieved within permitted time. Current state: {0}, desired state: {1}", _state, _newState); + _logger.Warn(error); + throw new AMQException(error); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs new file mode 100644 index 0000000000..dd0bb404cb --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Codec; +using Apache.Qpid.Codec.Demux; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Transport +{ + public class AMQProtocolProvider + { + private DemuxingProtocolCodecFactory _factory; + + public AMQProtocolProvider() + { + _factory = new DemuxingProtocolCodecFactory(); + _factory.Register(new AMQDataBlockEncoder()); + _factory.Register(new AMQDataBlockDecoder()); + _factory.Register(new ProtocolInitiation.Decoder()); + } + + public IProtocolCodecFactory CodecFactory + { + get + { + return _factory; + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs new file mode 100644 index 0000000000..1e217e755b --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Codec; +using Apache.Qpid.Codec.Support; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Transport +{ + public class AmqpChannel : IProtocolChannel + { + // Warning: don't use this log for regular logging. + static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); + + IByteChannel _byteChannel; + IProtocolEncoder _encoder; + IProtocolDecoder _decoder; + IProtocolDecoderOutput _decoderOutput; + private object _syncLock; + + public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput) + { + _byteChannel = byteChannel; + _decoderOutput = decoderOutput; + _syncLock = new object(); + + AMQProtocolProvider protocolProvider = new AMQProtocolProvider(); + IProtocolCodecFactory factory = protocolProvider.CodecFactory; + _encoder = factory.Encoder; + _decoder = factory.Decoder; + } + + public void Read() + { + ByteBuffer buffer = _byteChannel.Read(); + Decode(buffer); + } + + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + return _byteChannel.BeginRead(callback, state); + } + + public void EndRead(IAsyncResult result) + { + ByteBuffer buffer = _byteChannel.EndRead(result); + Decode(buffer); + } + + public void Write(IDataBlock o) + { + // TODO: Refactor to decorator. + if (_protocolTraceLog.IsDebugEnabled) + { + _protocolTraceLog.Debug(String.Format("WRITE {0}", o)); + } + // we should be doing an async write, but apparently + // the mentalis library doesn't queue async read/writes + // correctly and throws random IOException's. Stay sync for a while + //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); + _byteChannel.Write(Encode(o)); + } + + // not used for now + //private void OnAsyncWriteDone(IAsyncResult result) + //{ + // _byteChannel.EndWrite(result); + //} + + private void Decode(ByteBuffer buffer) + { + // make sure we don't try to decode more than + // one buffer at the same time + lock ( _syncLock ) + { + _decoder.Decode(buffer, _decoderOutput); + } + } + + private ByteBuffer Encode(object o) + { + SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); + _encoder.Encode(o, output); + return output.buffer; + } + + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs new file mode 100644 index 0000000000..35806f2a6e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Transport +{ + /// <summary> + /// Represents input/output channels that read + /// and write <see cref="ByteBuffer"/> instances + /// </summary> + public interface IByteChannel + { + /// <summary> + /// Read a <see cref="ByteBuffer"/> from the underlying + /// network stream and any configured filters + /// </summary> + /// <returns>A ByteBuffer, if available</returns> + ByteBuffer Read(); + /// <summary> + /// Begin an asynchronous read operation + /// </summary> + /// <param name="callback">Callback method to call when read operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginRead(AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous read operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> + /// <returns>The <see cref="ByteBuffer"/> read</returns> + ByteBuffer EndRead(IAsyncResult result); + /// <summary> + /// Write a <see cref="ByteBuffer"/> to the underlying network + /// stream, going through any configured filters + /// </summary> + /// <param name="buffer"></param> + void Write(ByteBuffer buffer); + /// <summary> + /// Begin an asynchronous write operation + /// </summary> + /// <param name="buffer">Buffer to write</param> + /// <param name="callback">A callback to call when the operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous write operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> + void EndWrite(IAsyncResult result); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs new file mode 100644 index 0000000000..0b59ee8799 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; + +namespace Apache.Qpid.Client.Transport +{ + public interface IProtocolChannel : IProtocolWriter + { + void Read(); + IAsyncResult BeginRead(AsyncCallback callback, object state); + void EndRead(IAsyncResult result); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs new file mode 100644 index 0000000000..592dff3a19 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Transport +{ + public interface IProtocolWriter + { + void Write(IDataBlock o); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs new file mode 100644 index 0000000000..7195b3ab04 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs @@ -0,0 +1,38 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+
+namespace Apache.Qpid.Client.Transport
+{
+ /// <summary>
+ /// Defines a way to introduce an arbitrary filtering
+ /// stream into the stream chain managed by <see cref="IoHandler"/>
+ /// </summary>
+ public interface IStreamFilter
+ {
+ /// <summary>
+ /// Creates a new filtering stream on top of another
+ /// </summary>
+ /// <param name="lowerStream">Next stream on the stack</param>
+ /// <returns>A new filtering stream</returns>
+ Stream CreateFilterStream(Stream lowerStream);
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs new file mode 100644 index 0000000000..693a9a9534 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.Transport +{ + public interface ITransport : IConnectionCloser + { + void Connect(IBrokerInfo broker, AMQConnection connection); + string LocalEndpoint { get; } + IProtocolWriter ProtocolWriter { get; } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs new file mode 100644 index 0000000000..9ac513069e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -0,0 +1,322 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Protocol;
+
+namespace Apache.Qpid.Client.Transport
+{
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+
+ return WrapByteArray(bytes, numOctets);
+ }
+
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ }
+ catch (Exception e)
+ {
+ _log.Warn("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+
+ #endregion // IDisposable Implementation
+
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+
+ return byteBuffer;
+ }
+
+
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+
+ public object State
+ {
+ get { return _state; }
+ }
+
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+
+ #endregion // Private and Helper Classes/Methods
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs new file mode 100644 index 0000000000..3841c158e4 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs @@ -0,0 +1,60 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Codec;
+using Apache.Qpid.Framing;
+using log4net;
+
+namespace Apache.Qpid.Client.Transport
+{
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
+
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ _protocolListener = protocolListener;
+ }
+
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Apache.Qpid.Client.Transport
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs new file mode 100644 index 0000000000..a1aa889ba0 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Buffer; +using Apache.Qpid.Codec; + +namespace Apache.Qpid.Client.Transport +{ + public class SingleProtocolEncoderOutput : IProtocolEncoderOutput + { + public ByteBuffer buffer; + + public void Write(ByteBuffer buf) + { + if (buffer != null) + { + throw new InvalidOperationException("{0} does not allow the writing of more than one buffer"); + } + buffer = buf; + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs new file mode 100644 index 0000000000..8a16f9a675 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -0,0 +1,150 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.IO;
+using System.Threading;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Codec;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
+
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
+
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
+
+
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
+
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
+
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
+
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
+
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
+
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
+
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
+
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
+
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ _amqpChannel.EndRead(result);
+
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
+ } catch ( Exception e )
+ {
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
+ }
+ }
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs new file mode 100644 index 0000000000..73575c7086 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs @@ -0,0 +1,92 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Apache.Qpid.Buffer;
+
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+{
+ class ByteChannel : IByteChannel
+ {
+ // Warning: don't use this log for regular logging.
+ private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel");
+
+ private IByteChannel _lowerChannel;
+
+ public ByteChannel(IByteChannel lowerChannel)
+ {
+ _lowerChannel = lowerChannel;
+ }
+
+ public ByteBuffer Read()
+ {
+ ByteBuffer result = _lowerChannel.Read();
+
+ // TODO: Move into decorator.
+ if (_ioTraceLog.IsDebugEnabled)
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", result));
+ }
+
+ return result;
+ }
+
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+
+ public void Write(ByteBuffer buffer)
+ {
+ // TODO: Move into decorator.
+ if (_ioTraceLog.IsDebugEnabled)
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+
+ _lowerChannel.Write(buffer);
+ }
+
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs new file mode 100644 index 0000000000..3d5d2898cf --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs @@ -0,0 +1,34 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using Apache.Qpid.Client.Qms;
+
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+{
+ interface ISocketConnector : IDisposable
+ {
+ string LocalEndpoint { get; }
+ Stream Connect(IBrokerInfo broker);
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs new file mode 100644 index 0000000000..83f7287e9b --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs @@ -0,0 +1,71 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using Apache.Qpid.Client.Qms;
+
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TCP connection over regular sockets.
+ /// </summary>
+ class SocketConnector : ISocketConnector
+ {
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ class MyTcpClient : TcpClient
+ {
+ public MyTcpClient(string host, int port)
+ : base(host, port)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs new file mode 100644 index 0000000000..708edde48c --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs @@ -0,0 +1,107 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using log4net;
+using Apache.Qpid.Client.Qms;
+using Org.Mentalis.Security.Ssl;
+using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
+using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
+
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TLS v1.0 connection using the Mentalis.org library
+ /// </summary>
+ /// <remarks>
+ /// It would've been easier to implement this at the StreamFilter
+ /// level, but unfortunately the Mentalis library doesn't support
+ /// a passthrough SSL stream class and is tied directly
+ /// to socket-like classes.
+ /// </remarks>
+ class SslSocketConnector : ISocketConnector
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ MCertificate cert = GetClientCert(broker);
+ SecurityOptions options = new SecurityOptions(
+ SecureProtocol.Tls1, cert, ConnectionEnd.Client
+ );
+ if ( broker.SslOptions != null
+ && broker.SslOptions.IgnoreValidationErrors )
+ {
+ _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
+ options.VerificationType = CredentialVerification.None;
+ }
+
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ private static MCertificate GetClientCert(IBrokerInfo broker)
+ {
+ // if a client certificate is configured,
+ // use that to enable mutual authentication
+ MCertificate cert = null;
+ if ( broker.SslOptions != null
+ && broker.SslOptions.ClientCertificate != null )
+ {
+ cert = MCertificate.CreateFromX509Certificate(
+ broker.SslOptions.ClientCertificate
+ );
+ _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
+ }
+ return cert;
+ }
+
+ class MyTcpClient : SecureTcpClient
+ {
+ public MyTcpClient(string host, int port, SecurityOptions options)
+ : base(host, port, options)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs b/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs new file mode 100644 index 0000000000..87bb2a2859 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs @@ -0,0 +1,98 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Common;
+
+namespace Apache.Qpid.Client.Util
+{
+ internal delegate void ThresholdMethod(int currentCount);
+
+ /// <summary>
+ /// Basic bounded queue used to implement prefetching.
+ /// Notice we do the callbacks here asynchronously to
+ /// avoid adding more complexity to the channel impl.
+ /// </summary>
+ internal class FlowControlQueue
+ {
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+ private int _itemCount;
+ private int _lowerBound;
+ private int _upperBound;
+ private ThresholdMethod _underThreshold;
+ private ThresholdMethod _overThreshold;
+
+ public FlowControlQueue(
+ int lowerBound,
+ int upperBound,
+ ThresholdMethod underThreshold,
+ ThresholdMethod overThreshold
+ )
+ {
+ _lowerBound = lowerBound;
+ _upperBound = upperBound;
+ _underThreshold = underThreshold;
+ _overThreshold = overThreshold;
+ }
+
+ public void Enqueue(object item)
+ {
+ _queue.EnqueueBlocking(item);
+ int count = Interlocked.Increment(ref _itemCount);
+ if ( _overThreshold != null )
+ {
+ if ( count == _upperBound )
+ {
+ _overThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _overThreshold
+ );
+ }
+ }
+ }
+
+ public object Dequeue()
+ {
+ object item = _queue.DequeueBlocking();
+ int count = Interlocked.Decrement(ref _itemCount);
+ if ( _underThreshold != null )
+ {
+ if ( count == _lowerBound )
+ {
+ _underThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _underThreshold
+ );
+ }
+ }
+ return item;
+ }
+
+ private void OnAsyncCallEnd(IAsyncResult res)
+ {
+ ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+ method.EndInvoke(res);
+ }
+ }
+}
|