diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Protocol')
10 files changed, 0 insertions, 1076 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs deleted file mode 100644 index a7ce808862..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Text; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Protocol -{ - public class AMQMethodEvent - { - private AMQMethodBody _method; - - private ushort _channelId; - - private AMQProtocolSession _protocolSession; - - public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession) - { - _channelId = channelId; - _method = method; - _protocolSession = protocolSession; - } - - public AMQMethodBody Method - { - get - { - return _method; - } - } - - public ushort ChannelId - { - get - { - return _channelId; - } - } - - public AMQProtocolSession ProtocolSession - { - get - { - return _protocolSession; - } - } - - public override String ToString() - { - StringBuilder buf = new StringBuilder("Method event: "); - buf.Append("\nChannel id: ").Append(_channelId); - buf.Append("\nMethod: ").Append(_method); - return buf.ToString(); - } - } -} - - diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs deleted file mode 100644 index c51538b70e..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs +++ /dev/null @@ -1,318 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using System.Threading; -using log4net; -using Apache.Qpid.Client.Failover; -using Apache.Qpid.Client.Protocol.Listener; -using Apache.Qpid.Client.State; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Protocol -{ - /// <summary> - /// AMQProtocolListener - /// - /// <p/>Fail-over state transition rules... - /// - /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order - /// to be able to send errors during failover back to the client application. The session won't be available in the case - /// when failing over due to a Connection.Redirect message from the broker. - /// - /// <p><table id="crc"><caption>CRC Card</caption> - /// <tr><th> Responsibilities <th> Collaborations - /// <tr><td> Track fail over state of a connection. - /// <tr><td> Manage method listeners. <td> IAMQMethodListener - /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler - /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener - /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener - /// </table> - /// - /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being - /// triggered, when it should not be. - /// - /// </summary> - public class AMQProtocolListener : IProtocolListener - { - /// <summary>Used for debugging.</summary> - private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); - - /// <summary> - /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, - /// the failover process is handed off to this handler. - /// </summary> - private FailoverHandler _failoverHandler; - - /// <summary>Tracks the current fail-over state.</summary> - internal FailoverState _failoverState = FailoverState.NOT_STARTED; - - internal FailoverState FailoverState - { - get { return _failoverState; } - set { _failoverState = value; } - } - - internal ManualResetEvent FailoverLatch; - - AMQConnection _connection; - AMQStateManager _stateManager; - - public AMQStateManager StateManager - { - get { return _stateManager; } - set { _stateManager = value; } - } - - private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); - - AMQProtocolSession _protocolSession = null; - - private readonly Object _lock = new Object(); - - public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } - - public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) - { - _connection = connection; - _stateManager = stateManager; - _failoverHandler = new FailoverHandler(connection); - } - - public void OnMessage(IDataBlock message) - { - // Handle incorrect protocol version. - if (message is ProtocolInitiation) - { - string error = String.Format("Protocol mismatch - {0}", message.ToString()); - AMQException e = new AMQProtocolHeaderException(error); - _log.Error("Closing connection because of protocol mismatch", e); - //_protocolSession.CloseProtocolSession(); - _stateManager.Error(e); - return; - } - - AMQFrame frame = (AMQFrame)message; - - if (frame.BodyFrame is AMQMethodBody) - { - if (_log.IsDebugEnabled) - { - _log.Debug("Method frame received: " + frame); - } - AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); - try - { - bool wasAnyoneInterested = false; - lock (_frameListeners.SyncRoot) - { - foreach (IAMQMethodListener listener in _frameListeners) - { - wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); - } - } - catch (Exception e) - { - foreach (IAMQMethodListener listener in _frameListeners) - { - listener.Error(e); - } - } - } - else if (frame.BodyFrame is ContentHeaderBody) - { - _protocolSession.MessageContentHeaderReceived(frame.Channel, - (ContentHeaderBody)frame.BodyFrame); - } - else if (frame.BodyFrame is ContentBody) - { - _protocolSession.MessageContentBodyReceived(frame.Channel, - (ContentBody)frame.BodyFrame); - } - else if (frame.BodyFrame is HeartbeatBody) - { - _log.Debug("HeartBeat received"); - } - } - - /// <summary> - /// Receives notification of any IO exceptions on the connection. - /// - /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then - /// all method listeners and the application connection object are notified of the connection failure exception. - /// - /// <p/>All other exception types are propagated to all method listeners. - /// </summary> - public void OnException(Exception cause) - { - _log.Warn("public void OnException(Exception cause = " + cause + "): called"); - - // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also - // ensures that this exception is fully propagated to all listeners, before another one can be processed. - lock (_lock) - { - if (cause is AMQConnectionClosedException || cause is System.IO.IOException) - { - // Try a fail-over because the connection has failed. - FailoverState failoverState = AttemptFailover(); - - // Check if the fail-over has failed, in which case notify all method listeners of the exception. - // The application connection object is also notified of the failure of the connection with the exception. - if (failoverState == FailoverState.FAILED) - { - _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); - - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - PropagateExceptionToWaiters(amqe); - _connection.ExceptionReceived(cause); - } - } - // Notify all method listeners of the exception. - else - { - PropagateExceptionToWaiters(cause); - _connection.ExceptionReceived(cause); - } - } - } - - /// <summary> - /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been - /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress - /// this method allows it to continue to run and will do nothing. - /// - /// <p/>This method should only be called when the connection has been remotely closed. - /// </summary> - /// - /// <returns>The fail-over state at the end of this attempt.</returns> - private FailoverState AttemptFailover() - { - _log.Debug("private void AttemptFailover(): called"); - _log.Debug("_failoverState = " + _failoverState); - - // Ensure that the connection stops sending heart beats, if it still is. - _connection.StopHeartBeatThread(); - - // Check that the connection policy allows fail-over to be attempted. - if (!_connection.IsFailoverAllowed) - { - _log.Debug("Connection does not allowed to failover"); - _connection.ExceptionReceived( - new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); - } - - // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. - if (_connection.Closed) - { - return _failoverState; - } - - // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is - // advanced to 'in progress' - if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) - { - _log.Info("Starting the fail-over process."); - - _failoverState = FailoverState.IN_PROGRESS; - StartFailoverThread(); - } - - return _failoverState; - } - - /// <summary> - /// There are two cases where we have other threads potentially blocking for events to be handled by this - /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - /// react appropriately. - /// - /// <param name="e">the exception to propagate</param> - /// </summary> - public void PropagateExceptionToWaiters(Exception e) - { - // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. - _stateManager.Error(e); - lock ( _lock ) - { - foreach ( IAMQMethodListener listener in _frameListeners ) - { - listener.Error(e); - } - } - } - - public void AddFrameListener(IAMQMethodListener listener) - { - lock ( _lock ) - { - _frameListeners.Add(listener); - } - } - - public void RemoveFrameListener(IAMQMethodListener listener) - { - if (_log.IsDebugEnabled) - { - _log.Debug("Removing frame listener: " + listener.ToString()); - } - lock ( _lock ) - { - _frameListeners.Remove(listener); - } - } - - public void BlockUntilNotFailingOver() - { - if (FailoverLatch != null) - { - FailoverLatch.WaitOne(); - } - } - - /// <summary> - /// "Failover" for redirection. - /// </summary> - /// <param name="host"></param> - /// <param name="port"></param> - public void Failover(string host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - StartFailoverThread(); - } - - private void StartFailoverThread() - { - Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); - failoverThread.Name = "Failover"; - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.IsBackground = false; - failoverThread.Start(); - } - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs deleted file mode 100644 index 1fb3d407eb..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using log4net; -using Apache.Qpid.Client.Message; -using Apache.Qpid.Client.Transport; -using Apache.Qpid.Framing; -using Apache.Qpid.Sasl; - -namespace Apache.Qpid.Client.Protocol -{ - public class AMQProtocolSession - { - private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); - - private readonly IProtocolWriter _protocolWriter; - private readonly IConnectionCloser _connectionCloser; - - /// <summary> - /// Maps from the channel id to the AmqChannel that it represents. - /// </summary> - //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); - private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); - - //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); - private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); - - /// <summary> - /// Maps from a channel id to an unprocessed message. This is used to tie together the - /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. - /// </summary> - //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); - private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); - - private AMQConnection _connection; - - public string ClientID { get { return _connection.ClientID; } } - - public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) - { - _protocolWriter = protocolWriter; - _connectionCloser = connectionCloser; - _connection = connection; - } - - public void Init() - { - // start the process of setting up the connection. This is the first place that - // data is written to the server. - _protocolWriter.Write(new ProtocolInitiation()); - } - - public string Username - { - get - { - return AMQConnection.Username; - } - } - - public string Password - { - get - { - return AMQConnection.Password; - } - } - - ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. - - public ConnectionTuneParameters ConnectionTuneParameters - { - get - { - return _connectionTuneParameters; - } - set - { - _connectionTuneParameters = value; - AMQConnection con = AMQConnection; - con.SetMaximumChannelCount(value.ChannelMax); - con.MaximumFrameSize = value.FrameMax; - } - } - - private ISaslClient _saslClient; - public ISaslClient SaslClient - { - get { return _saslClient; } - set { _saslClient = value; } - } - - /// <summary> - /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. - /// This is invoked on the MINA dispatcher thread. - /// </summary> - /// <param name="message">the unprocessed message</param> - /// <exception cname="AMQException">if this was not expected</exception> - public void UnprocessedMessageReceived(UnprocessedMessage message) - { - _channelId2UnprocessedMsgMap[message.ChannelId] = message; - } - - public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) - { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; - if (msg == null) - { - throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); - } - if (msg.ContentHeader != null) - { - throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); - } - msg.ContentHeader = contentHeader; - if (contentHeader.BodySize == 0) - { - DeliverMessageToAMQSession(channelId, msg); - } - } - - public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) - { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; - if (msg == null) - { - throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); - } - if (msg.ContentHeader == null) - { - _channelId2UnprocessedMsgMap.Remove(channelId); - throw new AMQException("Error: received content body without having received a ContentHeader frame first"); - } - try - { - msg.ReceiveBody(contentBody); - } - catch (UnexpectedBodyReceivedException e) - { - _channelId2UnprocessedMsgMap.Remove(channelId); - throw e; - } - if (msg.IsAllBodyDataReceived()) - { - DeliverMessageToAMQSession(channelId, msg); - } - } - - /// <summary> - /// Deliver a message to the appropriate session, removing the unprocessed message - /// from our map - /// <param name="channelId">the channel id the message should be delivered to</param> - /// <param name="msg"> the message</param> - private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) - { - AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; - channel.MessageReceived(msg); - _channelId2UnprocessedMsgMap.Remove(channelId); - } - - /// <summary> - /// Convenience method that writes a frame to the protocol session. Equivalent - /// to calling getProtocolSession().write(). - /// </summary> - /// <param name="frame">the frame to write</param> - public void WriteFrame(IDataBlock frame) - { - _protocolWriter.Write(frame); - } - - public void AddSessionByChannel(ushort channelId, AmqChannel channel) - { - if (channel == null) - { - throw new ArgumentNullException("Attempt to register a null channel"); - } - _logger.Debug("Add channel with channel id " + channelId); - _channelId2SessionMap[channelId] = channel; - } - - public void RemoveSessionByChannel(ushort channelId) - { - _logger.Debug("Removing session with channelId " + channelId); - _channelId2SessionMap.Remove(channelId); - } - - /// <summary> - /// Starts the process of closing a channel - /// </summary> - /// <param name="channel" the AmqChannel being closed</param> - public void CloseSession(AmqChannel channel) - { - _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); - ushort channelId = channel.ChannelId; - - // we need to know when a channel is closing so that we can respond - // with a channel.close frame when we receive any other type of frame - // on that channel - _closingChannels[channelId] = channel; - - } - - /// <summary> - /// Called from the ChannelClose handler when a channel close frame is received. - /// This method decides whether this is a response or an initiation. The latter - /// case causes the AmqChannel to be closed and an exception to be thrown if - /// appropriate. - /// </summary> - /// <param name="channelId">the id of the channel (session)</param> - /// <returns>true if the client must respond to the server, i.e. if the server - /// initiated the channel close, false if the channel close is just the server - /// responding to the client's earlier request to close the channel.</returns> - public bool ChannelClosed(ushort channelId, int code, string text) - { - // if this is not a response to an earlier request to close the channel - if (!_closingChannels.ContainsKey(channelId)) - { - _closingChannels.Remove(channelId); - AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; - channel.ClosedWithException(new AMQException(_logger, code, text)); - return true; - } - else - { - return false; - } - } - - public AMQConnection AMQConnection - { - get - { - return _connection; - } - } - - public void CloseProtocolSession() - { - _logger.Debug("Closing protocol session"); - _connectionCloser.Close(); - } - - internal string GenerateQueueName() - { - return "ntmp_" + System.Guid.NewGuid(); - } - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs deleted file mode 100644 index 2f23a1571d..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -using System; -using System.Text; - -namespace Apache.Qpid.Client.Protocol -{ - /// <summary> - /// Default timeout values for the protocol - /// </summary> - sealed class DefaultTimeouts - { - /// <summary> - /// Maximum number of milliseconds to wait for a state change - /// in the protocol's state machine - /// </summary> - public const int MaxWaitForState = 30* 1000; - /// <summary> - /// Maximum number of milliseconds to wait for a reply - /// frame when doing synchronous writer to the broker - /// </summary> - public const int MaxWaitForSyncWriter = 30 * 1000; - - private DefaultTimeouts() - { - } - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs deleted file mode 100644 index e3298200c4..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -namespace Apache.Qpid.Client.Protocol -{ - public interface IConnectionCloser - { - void Close(); - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs deleted file mode 100644 index 3b53f015f8..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using Apache.Qpid.Client.Protocol.Listener; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Protocol -{ - public interface IProtocolListener - { - void OnMessage(IDataBlock message); - void OnException(Exception e); - - // XXX: .NET way of doing listeners? - void AddFrameListener(IAMQMethodListener listener); - void RemoveFrameListener(IAMQMethodListener listener); - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs deleted file mode 100644 index 9cc9f8cee5..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs +++ /dev/null @@ -1,110 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Protocol.Listener -{ - public abstract class BlockingMethodFrameListener : IAMQMethodListener - { - private ManualResetEvent _resetEvent; - - public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame); - - /// <summary> - /// This is set if there is an exception thrown from processCommandFrame and the - /// exception is rethrown to the caller of blockForFrame() - /// </summary> - private volatile Exception _error; - - protected ushort _channelId; - - protected AMQMethodEvent _doneEvt = null; - - public BlockingMethodFrameListener(ushort channelId) - { - _channelId = channelId; - _resetEvent = new ManualResetEvent(false); - } - - /// <summary> - /// This method is called by the MINA dispatching thread. Note that it could - /// be called before BlockForFrame() has been called. - /// </summary> - /// <param name="evt">the frame event</param> - /// <returns>true if the listener has dealt with this frame</returns> - /// <exception cref="AMQException"></exception> - public bool MethodReceived(AMQMethodEvent evt) - { - AMQMethodBody method = evt.Method; - - try - { - bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method); - if (ready) - { - _doneEvt = evt; - _resetEvent.Set(); - } - - return ready; - } - catch (AMQException e) - { - Error(e); - // we rethrow the error here, and the code in the frame dispatcher will go round - // each listener informing them that an exception has been thrown - throw e; - } - } - - /// <summary> - /// This method is called by the thread that wants to wait for a frame. - /// </summary> - /// <param name="timeout">Set the number of milliseconds to wait</param> - public AMQMethodEvent BlockForFrame(int timeout) - { - _resetEvent.WaitOne(timeout, true); - //at this point the event will have been signalled. The error field might or might not be set - // depending on whether an error occurred - if (_error != null) - { - throw _error; - } - - return _doneEvt; - } - - /// <summary> - /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this - /// class to avoid code repetition but again is only called by the MINA dispatcher thread. - /// </summary> - /// <param name="e">the exception that caused the error</param> - public void Error(Exception e) - { - // set the error so that the thread that is blocking (in BlockForFrame()) - // can pick up the exception and rethrow to the caller - _error = e; - _resetEvent.Set(); - } - } -} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs deleted file mode 100644 index b5450d00f7..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; - -namespace Apache.Qpid.Client.Protocol.Listener -{ - public interface IAMQMethodListener - { - /// <summary> - /// Invoked when a method frame has been received - /// <param name="evt">the event</param> - /// <returns>true if the handler has processed the method frame, false otherwise. Note - /// that this does not prohibit the method event being delivered to subsequent listeners - /// but can be used to determine if nobody has dealt with an incoming method frame.</param> - /// <exception cname="AMQException">if an error has occurred. This exception will be delivered - /// to all registered listeners using the error() method (see below) allowing them to - /// perform cleanup if necessary.</exception> - bool MethodReceived(AMQMethodEvent evt); - - /// <summary> - /// Callback when an error has occurred. Allows listeners to clean up. - /// </summary> - /// <param name="e">the exception</param> - void Error(Exception e); - } -} - - diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs deleted file mode 100644 index 8cdc1dbba9..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Protocol.Listener -{ - public class SpecificMethodFrameListener : BlockingMethodFrameListener - { - private readonly Type _expectedClass; - - public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId) - { - _expectedClass = expectedClass; - } - - public override bool ProcessMethod(ushort channelId, AMQMethodBody frame) - { - return _expectedClass.IsInstanceOfType(frame); - } - } -} - - diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs deleted file mode 100644 index 11918f1ea2..0000000000 --- a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using Apache.Qpid.Client.Protocol.Listener; -using Apache.Qpid.Client.Transport; -using Apache.Qpid.Framing; - -using log4net; - -namespace Apache.Qpid.Client.Protocol -{ - /// <summary> - /// A convenient interface to writing protocol frames. - /// </summary> - public class ProtocolWriter - { - - private ILog _logger = LogManager.GetLogger(typeof(ProtocolWriter)); - - IProtocolWriter _protocolWriter; - IProtocolListener _protocolListener; - - public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener) - { - _protocolWriter = protocolWriter; - _protocolListener = protocolListener; - } - - public void WriteFrame(IDataBlock frame) - { - _protocolWriter.Write(frame); - } - - /// <summary> - /// Convenience method that writes a frame to the protocol session and waits for - /// a particular response. Equivalent to calling getProtocolSession().write() then - /// waiting for the response. - /// </summary> - /// <param name="frame">the frame</param> - /// <param name="listener">the blocking listener. Note the calling thread will block.</param> - /// <param name="timeout">set the number of milliseconds to wait</param> - private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener, int timeout) - { - try - { - _protocolListener.AddFrameListener(listener); - _protocolWriter.Write(frame); - - return listener.BlockForFrame(timeout); - } - finally - { - _protocolListener.RemoveFrameListener(listener); - } - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener - } - - /// <summary> - /// Convenience method that writes a frame to the protocol session and waits for - /// a particular response. Equivalent to calling getProtocolSession().write() then - /// waiting for the response. - /// </summary> - /// <param name="frame">the frame</param> - /// <param name="responseType">the type of method response</param> - public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType) - { - // TODO: If each frame knew it's response type, then the responseType argument would - // TODO: not be neccesary. - return SyncWrite(frame, responseType, DefaultTimeouts.MaxWaitForSyncWriter); - } - - /// <summary> - /// Convenience method that writes a frame to the protocol session and waits for - /// a particular response. Equivalent to calling getProtocolSession().write() then - /// waiting for the response. - /// </summary> - /// <param name="frame">the frame</param> - /// <param name="responseType">the type of method response</param> - /// <param name="timeout">set the number of milliseconds to wait</param> - /// <returns>set the number of milliseconds to wait</returns> - public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType, int timeout) - { - return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType), timeout); - } - } -} - - |