diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Protocol')
10 files changed, 1076 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs new file mode 100644 index 0000000000..a7ce808862 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + public class AMQMethodEvent + { + private AMQMethodBody _method; + + private ushort _channelId; + + private AMQProtocolSession _protocolSession; + + public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession) + { + _channelId = channelId; + _method = method; + _protocolSession = protocolSession; + } + + public AMQMethodBody Method + { + get + { + return _method; + } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQProtocolSession ProtocolSession + { + get + { + return _protocolSession; + } + } + + public override String ToString() + { + StringBuilder buf = new StringBuilder("Method event: "); + buf.Append("\nChannel id: ").Append(_channelId); + buf.Append("\nMethod: ").Append(_method); + return buf.ToString(); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs new file mode 100644 index 0000000000..c51538b70e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -0,0 +1,318 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// AMQProtocolListener + /// + /// <p/>Fail-over state transition rules... + /// + /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order + /// to be able to send errors during failover back to the client application. The session won't be available in the case + /// when failing over due to a Connection.Redirect message from the broker. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Track fail over state of a connection. + /// <tr><td> Manage method listeners. <td> IAMQMethodListener + /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler + /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener + /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener + /// </table> + /// + /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being + /// triggered, when it should not be. + /// + /// </summary> + public class AMQProtocolListener : IProtocolListener + { + /// <summary>Used for debugging.</summary> + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); + + /// <summary> + /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, + /// the failover process is handed off to this handler. + /// </summary> + private FailoverHandler _failoverHandler; + + /// <summary>Tracks the current fail-over state.</summary> + internal FailoverState _failoverState = FailoverState.NOT_STARTED; + + internal FailoverState FailoverState + { + get { return _failoverState; } + set { _failoverState = value; } + } + + internal ManualResetEvent FailoverLatch; + + AMQConnection _connection; + AMQStateManager _stateManager; + + public AMQStateManager StateManager + { + get { return _stateManager; } + set { _stateManager = value; } + } + + private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); + + AMQProtocolSession _protocolSession = null; + + private readonly Object _lock = new Object(); + + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } + + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _failoverHandler = new FailoverHandler(connection); + } + + public void OnMessage(IDataBlock message) + { + // Handle incorrect protocol version. + if (message is ProtocolInitiation) + { + string error = String.Format("Protocol mismatch - {0}", message.ToString()); + AMQException e = new AMQProtocolHeaderException(error); + _log.Error("Closing connection because of protocol mismatch", e); + //_protocolSession.CloseProtocolSession(); + _stateManager.Error(e); + return; + } + + AMQFrame frame = (AMQFrame)message; + + if (frame.BodyFrame is AMQMethodBody) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Method frame received: " + frame); + } + AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); + try + { + bool wasAnyoneInterested = false; + lock (_frameListeners.SyncRoot) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (Exception e) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + } + else if (frame.BodyFrame is ContentHeaderBody) + { + _protocolSession.MessageContentHeaderReceived(frame.Channel, + (ContentHeaderBody)frame.BodyFrame); + } + else if (frame.BodyFrame is ContentBody) + { + _protocolSession.MessageContentBodyReceived(frame.Channel, + (ContentBody)frame.BodyFrame); + } + else if (frame.BodyFrame is HeartbeatBody) + { + _log.Debug("HeartBeat received"); + } + } + + /// <summary> + /// Receives notification of any IO exceptions on the connection. + /// + /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then + /// all method listeners and the application connection object are notified of the connection failure exception. + /// + /// <p/>All other exception types are propagated to all method listeners. + /// </summary> + public void OnException(Exception cause) + { + _log.Warn("public void OnException(Exception cause = " + cause + "): called"); + + // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also + // ensures that this exception is fully propagated to all listeners, before another one can be processed. + lock (_lock) + { + if (cause is AMQConnectionClosedException || cause is System.IO.IOException) + { + // Try a fail-over because the connection has failed. + FailoverState failoverState = AttemptFailover(); + + // Check if the fail-over has failed, in which case notify all method listeners of the exception. + // The application connection object is also notified of the failure of the connection with the exception. + if (failoverState == FailoverState.FAILED) + { + _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); + } + } + // Notify all method listeners of the exception. + else + { + PropagateExceptionToWaiters(cause); + _connection.ExceptionReceived(cause); + } + } + } + + /// <summary> + /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been + /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress + /// this method allows it to continue to run and will do nothing. + /// + /// <p/>This method should only be called when the connection has been remotely closed. + /// </summary> + /// + /// <returns>The fail-over state at the end of this attempt.</returns> + private FailoverState AttemptFailover() + { + _log.Debug("private void AttemptFailover(): called"); + _log.Debug("_failoverState = " + _failoverState); + + // Ensure that the connection stops sending heart beats, if it still is. + _connection.StopHeartBeatThread(); + + // Check that the connection policy allows fail-over to be attempted. + if (!_connection.IsFailoverAllowed) + { + _log.Debug("Connection does not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); + } + + // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. + if (_connection.Closed) + { + return _failoverState; + } + + // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is + // advanced to 'in progress' + if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) + { + _log.Info("Starting the fail-over process."); + + _failoverState = FailoverState.IN_PROGRESS; + StartFailoverThread(); + } + + return _failoverState; + } + + /// <summary> + /// There are two cases where we have other threads potentially blocking for events to be handled by this + /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a + /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can + /// react appropriately. + /// + /// <param name="e">the exception to propagate</param> + /// </summary> + public void PropagateExceptionToWaiters(Exception e) + { + // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. + _stateManager.Error(e); + lock ( _lock ) + { + foreach ( IAMQMethodListener listener in _frameListeners ) + { + listener.Error(e); + } + } + } + + public void AddFrameListener(IAMQMethodListener listener) + { + lock ( _lock ) + { + _frameListeners.Add(listener); + } + } + + public void RemoveFrameListener(IAMQMethodListener listener) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Removing frame listener: " + listener.ToString()); + } + lock ( _lock ) + { + _frameListeners.Remove(listener); + } + } + + public void BlockUntilNotFailingOver() + { + if (FailoverLatch != null) + { + FailoverLatch.WaitOne(); + } + } + + /// <summary> + /// "Failover" for redirection. + /// </summary> + /// <param name="host"></param> + /// <param name="port"></param> + public void Failover(string host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + StartFailoverThread(); + } + + private void StartFailoverThread() + { + Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); + failoverThread.Name = "Failover"; + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.IsBackground = false; + failoverThread.Start(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs new file mode 100644 index 0000000000..1fb3d407eb --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Framing; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Protocol +{ + public class AMQProtocolSession + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); + + private readonly IProtocolWriter _protocolWriter; + private readonly IConnectionCloser _connectionCloser; + + /// <summary> + /// Maps from the channel id to the AmqChannel that it represents. + /// </summary> + //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); + + //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from a channel id to an unprocessed message. This is used to tie together the + /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + /// </summary> + //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); + + private AMQConnection _connection; + + public string ClientID { get { return _connection.ClientID; } } + + public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) + { + _protocolWriter = protocolWriter; + _connectionCloser = connectionCloser; + _connection = connection; + } + + public void Init() + { + // start the process of setting up the connection. This is the first place that + // data is written to the server. + _protocolWriter.Write(new ProtocolInitiation()); + } + + public string Username + { + get + { + return AMQConnection.Username; + } + } + + public string Password + { + get + { + return AMQConnection.Password; + } + } + + ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. + + public ConnectionTuneParameters ConnectionTuneParameters + { + get + { + return _connectionTuneParameters; + } + set + { + _connectionTuneParameters = value; + AMQConnection con = AMQConnection; + con.SetMaximumChannelCount(value.ChannelMax); + con.MaximumFrameSize = value.FrameMax; + } + } + + private ISaslClient _saslClient; + public ISaslClient SaslClient + { + get { return _saslClient; } + set { _saslClient = value; } + } + + /// <summary> + /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. + /// This is invoked on the MINA dispatcher thread. + /// </summary> + /// <param name="message">the unprocessed message</param> + /// <exception cname="AMQException">if this was not expected</exception> + public void UnprocessedMessageReceived(UnprocessedMessage message) + { + _channelId2UnprocessedMsgMap[message.ChannelId] = message; + } + + public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); + } + if (msg.ContentHeader != null) + { + throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); + } + msg.ContentHeader = contentHeader; + if (contentHeader.BodySize == 0) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); + } + if (msg.ContentHeader == null) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw new AMQException("Error: received content body without having received a ContentHeader frame first"); + } + try + { + msg.ReceiveBody(contentBody); + } + catch (UnexpectedBodyReceivedException e) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw e; + } + if (msg.IsAllBodyDataReceived()) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + /// <summary> + /// Deliver a message to the appropriate session, removing the unprocessed message + /// from our map + /// <param name="channelId">the channel id the message should be delivered to</param> + /// <param name="msg"> the message</param> + private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) + { + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.MessageReceived(msg); + _channelId2UnprocessedMsgMap.Remove(channelId); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session. Equivalent + /// to calling getProtocolSession().write(). + /// </summary> + /// <param name="frame">the frame to write</param> + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + public void AddSessionByChannel(ushort channelId, AmqChannel channel) + { + if (channel == null) + { + throw new ArgumentNullException("Attempt to register a null channel"); + } + _logger.Debug("Add channel with channel id " + channelId); + _channelId2SessionMap[channelId] = channel; + } + + public void RemoveSessionByChannel(ushort channelId) + { + _logger.Debug("Removing session with channelId " + channelId); + _channelId2SessionMap.Remove(channelId); + } + + /// <summary> + /// Starts the process of closing a channel + /// </summary> + /// <param name="channel" the AmqChannel being closed</param> + public void CloseSession(AmqChannel channel) + { + _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); + ushort channelId = channel.ChannelId; + + // we need to know when a channel is closing so that we can respond + // with a channel.close frame when we receive any other type of frame + // on that channel + _closingChannels[channelId] = channel; + + } + + /// <summary> + /// Called from the ChannelClose handler when a channel close frame is received. + /// This method decides whether this is a response or an initiation. The latter + /// case causes the AmqChannel to be closed and an exception to be thrown if + /// appropriate. + /// </summary> + /// <param name="channelId">the id of the channel (session)</param> + /// <returns>true if the client must respond to the server, i.e. if the server + /// initiated the channel close, false if the channel close is just the server + /// responding to the client's earlier request to close the channel.</returns> + public bool ChannelClosed(ushort channelId, int code, string text) + { + // if this is not a response to an earlier request to close the channel + if (!_closingChannels.ContainsKey(channelId)) + { + _closingChannels.Remove(channelId); + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.ClosedWithException(new AMQException(_logger, code, text)); + return true; + } + else + { + return false; + } + } + + public AMQConnection AMQConnection + { + get + { + return _connection; + } + } + + public void CloseProtocolSession() + { + _logger.Debug("Closing protocol session"); + _connectionCloser.Close(); + } + + internal string GenerateQueueName() + { + return "ntmp_" + System.Guid.NewGuid(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs new file mode 100644 index 0000000000..2f23a1571d --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Text; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// Default timeout values for the protocol + /// </summary> + sealed class DefaultTimeouts + { + /// <summary> + /// Maximum number of milliseconds to wait for a state change + /// in the protocol's state machine + /// </summary> + public const int MaxWaitForState = 30* 1000; + /// <summary> + /// Maximum number of milliseconds to wait for a reply + /// frame when doing synchronous writer to the broker + /// </summary> + public const int MaxWaitForSyncWriter = 30 * 1000; + + private DefaultTimeouts() + { + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs new file mode 100644 index 0000000000..e3298200c4 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.Protocol +{ + public interface IConnectionCloser + { + void Close(); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs new file mode 100644 index 0000000000..3b53f015f8 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + public interface IProtocolListener + { + void OnMessage(IDataBlock message); + void OnException(Exception e); + + // XXX: .NET way of doing listeners? + void AddFrameListener(IAMQMethodListener listener); + void RemoveFrameListener(IAMQMethodListener listener); + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs new file mode 100644 index 0000000000..9cc9f8cee5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public abstract class BlockingMethodFrameListener : IAMQMethodListener + { + private ManualResetEvent _resetEvent; + + public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame); + + /// <summary> + /// This is set if there is an exception thrown from processCommandFrame and the + /// exception is rethrown to the caller of blockForFrame() + /// </summary> + private volatile Exception _error; + + protected ushort _channelId; + + protected AMQMethodEvent _doneEvt = null; + + public BlockingMethodFrameListener(ushort channelId) + { + _channelId = channelId; + _resetEvent = new ManualResetEvent(false); + } + + /// <summary> + /// This method is called by the MINA dispatching thread. Note that it could + /// be called before BlockForFrame() has been called. + /// </summary> + /// <param name="evt">the frame event</param> + /// <returns>true if the listener has dealt with this frame</returns> + /// <exception cref="AMQException"></exception> + public bool MethodReceived(AMQMethodEvent evt) + { + AMQMethodBody method = evt.Method; + + try + { + bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method); + if (ready) + { + _doneEvt = evt; + _resetEvent.Set(); + } + + return ready; + } + catch (AMQException e) + { + Error(e); + // we rethrow the error here, and the code in the frame dispatcher will go round + // each listener informing them that an exception has been thrown + throw e; + } + } + + /// <summary> + /// This method is called by the thread that wants to wait for a frame. + /// </summary> + /// <param name="timeout">Set the number of milliseconds to wait</param> + public AMQMethodEvent BlockForFrame(int timeout) + { + _resetEvent.WaitOne(timeout, true); + //at this point the event will have been signalled. The error field might or might not be set + // depending on whether an error occurred + if (_error != null) + { + throw _error; + } + + return _doneEvt; + } + + /// <summary> + /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this + /// class to avoid code repetition but again is only called by the MINA dispatcher thread. + /// </summary> + /// <param name="e">the exception that caused the error</param> + public void Error(Exception e) + { + // set the error so that the thread that is blocking (in BlockForFrame()) + // can pick up the exception and rethrow to the caller + _error = e; + _resetEvent.Set(); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs new file mode 100644 index 0000000000..b5450d00f7 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public interface IAMQMethodListener + { + /// <summary> + /// Invoked when a method frame has been received + /// <param name="evt">the event</param> + /// <returns>true if the handler has processed the method frame, false otherwise. Note + /// that this does not prohibit the method event being delivered to subsequent listeners + /// but can be used to determine if nobody has dealt with an incoming method frame.</param> + /// <exception cname="AMQException">if an error has occurred. This exception will be delivered + /// to all registered listeners using the error() method (see below) allowing them to + /// perform cleanup if necessary.</exception> + bool MethodReceived(AMQMethodEvent evt); + + /// <summary> + /// Callback when an error has occurred. Allows listeners to clean up. + /// </summary> + /// <param name="e">the exception</param> + void Error(Exception e); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs new file mode 100644 index 0000000000..8cdc1dbba9 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol.Listener +{ + public class SpecificMethodFrameListener : BlockingMethodFrameListener + { + private readonly Type _expectedClass; + + public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId) + { + _expectedClass = expectedClass; + } + + public override bool ProcessMethod(ushort channelId, AMQMethodBody frame) + { + return _expectedClass.IsInstanceOfType(frame); + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs new file mode 100644 index 0000000000..11918f1ea2 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Framing; + +using log4net; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// A convenient interface to writing protocol frames. + /// </summary> + public class ProtocolWriter + { + + private ILog _logger = LogManager.GetLogger(typeof(ProtocolWriter)); + + IProtocolWriter _protocolWriter; + IProtocolListener _protocolListener; + + public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener) + { + _protocolWriter = protocolWriter; + _protocolListener = protocolListener; + } + + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="listener">the blocking listener. Note the calling thread will block.</param> + /// <param name="timeout">set the number of milliseconds to wait</param> + private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener, int timeout) + { + try + { + _protocolListener.AddFrameListener(listener); + _protocolWriter.Write(frame); + + return listener.BlockForFrame(timeout); + } + finally + { + _protocolListener.RemoveFrameListener(listener); + } + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="responseType">the type of method response</param> + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType) + { + // TODO: If each frame knew it's response type, then the responseType argument would + // TODO: not be neccesary. + return SyncWrite(frame, responseType, DefaultTimeouts.MaxWaitForSyncWriter); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="responseType">the type of method response</param> + /// <param name="timeout">set the number of milliseconds to wait</param> + /// <returns>set the number of milliseconds to wait</returns> + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType, int timeout) + { + return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType), timeout); + } + } +} + + |