summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client/Client/Protocol
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Protocol')
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs76
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs318
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs267
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs47
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs27
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs36
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs110
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs46
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs42
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs107
10 files changed, 0 insertions, 1076 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
deleted file mode 100644
index a7ce808862..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Text;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Protocol
-{
- public class AMQMethodEvent
- {
- private AMQMethodBody _method;
-
- private ushort _channelId;
-
- private AMQProtocolSession _protocolSession;
-
- public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
- {
- _channelId = channelId;
- _method = method;
- _protocolSession = protocolSession;
- }
-
- public AMQMethodBody Method
- {
- get
- {
- return _method;
- }
- }
-
- public ushort ChannelId
- {
- get
- {
- return _channelId;
- }
- }
-
- public AMQProtocolSession ProtocolSession
- {
- get
- {
- return _protocolSession;
- }
- }
-
- public override String ToString()
- {
- StringBuilder buf = new StringBuilder("Method event: ");
- buf.Append("\nChannel id: ").Append(_channelId);
- buf.Append("\nMethod: ").Append(_method);
- return buf.ToString();
- }
- }
-}
-
-
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
deleted file mode 100644
index c51538b70e..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Client.Failover;
-using Apache.Qpid.Client.Protocol.Listener;
-using Apache.Qpid.Client.State;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Protocol
-{
- /// <summary>
- /// AMQProtocolListener
- ///
- /// <p/>Fail-over state transition rules...
- ///
- /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order
- /// to be able to send errors during failover back to the client application. The session won't be available in the case
- /// when failing over due to a Connection.Redirect message from the broker.
- ///
- /// <p><table id="crc"><caption>CRC Card</caption>
- /// <tr><th> Responsibilities <th> Collaborations
- /// <tr><td> Track fail over state of a connection.
- /// <tr><td> Manage method listeners. <td> IAMQMethodListener
- /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler
- /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener
- /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener
- /// </table>
- ///
- /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being
- /// triggered, when it should not be.
- ///
- /// </summary>
- public class AMQProtocolListener : IProtocolListener
- {
- /// <summary>Used for debugging.</summary>
- private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
-
- /// <summary>
- /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it,
- /// the failover process is handed off to this handler.
- /// </summary>
- private FailoverHandler _failoverHandler;
-
- /// <summary>Tracks the current fail-over state.</summary>
- internal FailoverState _failoverState = FailoverState.NOT_STARTED;
-
- internal FailoverState FailoverState
- {
- get { return _failoverState; }
- set { _failoverState = value; }
- }
-
- internal ManualResetEvent FailoverLatch;
-
- AMQConnection _connection;
- AMQStateManager _stateManager;
-
- public AMQStateManager StateManager
- {
- get { return _stateManager; }
- set { _stateManager = value; }
- }
-
- private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
-
- AMQProtocolSession _protocolSession = null;
-
- private readonly Object _lock = new Object();
-
- public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } }
-
- public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
- {
- _connection = connection;
- _stateManager = stateManager;
- _failoverHandler = new FailoverHandler(connection);
- }
-
- public void OnMessage(IDataBlock message)
- {
- // Handle incorrect protocol version.
- if (message is ProtocolInitiation)
- {
- string error = String.Format("Protocol mismatch - {0}", message.ToString());
- AMQException e = new AMQProtocolHeaderException(error);
- _log.Error("Closing connection because of protocol mismatch", e);
- //_protocolSession.CloseProtocolSession();
- _stateManager.Error(e);
- return;
- }
-
- AMQFrame frame = (AMQFrame)message;
-
- if (frame.BodyFrame is AMQMethodBody)
- {
- if (_log.IsDebugEnabled)
- {
- _log.Debug("Method frame received: " + frame);
- }
- AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
- try
- {
- bool wasAnyoneInterested = false;
- lock (_frameListeners.SyncRoot)
- {
- foreach (IAMQMethodListener listener in _frameListeners)
- {
- wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
- }
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
- }
- }
- catch (Exception e)
- {
- foreach (IAMQMethodListener listener in _frameListeners)
- {
- listener.Error(e);
- }
- }
- }
- else if (frame.BodyFrame is ContentHeaderBody)
- {
- _protocolSession.MessageContentHeaderReceived(frame.Channel,
- (ContentHeaderBody)frame.BodyFrame);
- }
- else if (frame.BodyFrame is ContentBody)
- {
- _protocolSession.MessageContentBodyReceived(frame.Channel,
- (ContentBody)frame.BodyFrame);
- }
- else if (frame.BodyFrame is HeartbeatBody)
- {
- _log.Debug("HeartBeat received");
- }
- }
-
- /// <summary>
- /// Receives notification of any IO exceptions on the connection.
- ///
- /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then
- /// all method listeners and the application connection object are notified of the connection failure exception.
- ///
- /// <p/>All other exception types are propagated to all method listeners.
- /// </summary>
- public void OnException(Exception cause)
- {
- _log.Warn("public void OnException(Exception cause = " + cause + "): called");
-
- // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also
- // ensures that this exception is fully propagated to all listeners, before another one can be processed.
- lock (_lock)
- {
- if (cause is AMQConnectionClosedException || cause is System.IO.IOException)
- {
- // Try a fail-over because the connection has failed.
- FailoverState failoverState = AttemptFailover();
-
- // Check if the fail-over has failed, in which case notify all method listeners of the exception.
- // The application connection object is also notified of the failure of the connection with the exception.
- if (failoverState == FailoverState.FAILED)
- {
- _log.Debug("Fail-over has failed. Notifying all method listeners of the exception.");
-
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- PropagateExceptionToWaiters(amqe);
- _connection.ExceptionReceived(cause);
- }
- }
- // Notify all method listeners of the exception.
- else
- {
- PropagateExceptionToWaiters(cause);
- _connection.ExceptionReceived(cause);
- }
- }
- }
-
- /// <summary>
- /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been
- /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress
- /// this method allows it to continue to run and will do nothing.
- ///
- /// <p/>This method should only be called when the connection has been remotely closed.
- /// </summary>
- ///
- /// <returns>The fail-over state at the end of this attempt.</returns>
- private FailoverState AttemptFailover()
- {
- _log.Debug("private void AttemptFailover(): called");
- _log.Debug("_failoverState = " + _failoverState);
-
- // Ensure that the connection stops sending heart beats, if it still is.
- _connection.StopHeartBeatThread();
-
- // Check that the connection policy allows fail-over to be attempted.
- if (!_connection.IsFailoverAllowed)
- {
- _log.Debug("Connection does not allowed to failover");
- _connection.ExceptionReceived(
- new AMQDisconnectedException("Broker closed connection and reconnection is not permitted."));
- }
-
- // Check if connection was closed deliberately by the application, in which case no fail-over is attempted.
- if (_connection.Closed)
- {
- return _failoverState;
- }
-
- // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is
- // advanced to 'in progress'
- if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed)
- {
- _log.Info("Starting the fail-over process.");
-
- _failoverState = FailoverState.IN_PROGRESS;
- StartFailoverThread();
- }
-
- return _failoverState;
- }
-
- /// <summary>
- /// There are two cases where we have other threads potentially blocking for events to be handled by this
- /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
- /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
- /// react appropriately.
- ///
- /// <param name="e">the exception to propagate</param>
- /// </summary>
- public void PropagateExceptionToWaiters(Exception e)
- {
- // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
- _stateManager.Error(e);
- lock ( _lock )
- {
- foreach ( IAMQMethodListener listener in _frameListeners )
- {
- listener.Error(e);
- }
- }
- }
-
- public void AddFrameListener(IAMQMethodListener listener)
- {
- lock ( _lock )
- {
- _frameListeners.Add(listener);
- }
- }
-
- public void RemoveFrameListener(IAMQMethodListener listener)
- {
- if (_log.IsDebugEnabled)
- {
- _log.Debug("Removing frame listener: " + listener.ToString());
- }
- lock ( _lock )
- {
- _frameListeners.Remove(listener);
- }
- }
-
- public void BlockUntilNotFailingOver()
- {
- if (FailoverLatch != null)
- {
- FailoverLatch.WaitOne();
- }
- }
-
- /// <summary>
- /// "Failover" for redirection.
- /// </summary>
- /// <param name="host"></param>
- /// <param name="port"></param>
- public void Failover(string host, int port)
- {
- _failoverHandler.setHost(host);
- _failoverHandler.setPort(port);
- // see javadoc for FailoverHandler to see rationale for separate thread
- StartFailoverThread();
- }
-
- private void StartFailoverThread()
- {
- Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
- failoverThread.Name = "Failover";
- // Do not inherit daemon-ness from current thread as this can be a daemon
- // thread such as a AnonymousIoService thread.
- failoverThread.IsBackground = false;
- failoverThread.Start();
- }
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
deleted file mode 100644
index 1fb3d407eb..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using log4net;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Client.Transport;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Sasl;
-
-namespace Apache.Qpid.Client.Protocol
-{
- public class AMQProtocolSession
- {
- private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession));
-
- private readonly IProtocolWriter _protocolWriter;
- private readonly IConnectionCloser _connectionCloser;
-
- /// <summary>
- /// Maps from the channel id to the AmqChannel that it represents.
- /// </summary>
- //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
- private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable());
-
- //private ConcurrentMap _closingChannels = new ConcurrentHashMap();
- private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable());
-
- /// <summary>
- /// Maps from a channel id to an unprocessed message. This is used to tie together the
- /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
- /// </summary>
- //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
- private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable());
-
- private AMQConnection _connection;
-
- public string ClientID { get { return _connection.ClientID; } }
-
- public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
- {
- _protocolWriter = protocolWriter;
- _connectionCloser = connectionCloser;
- _connection = connection;
- }
-
- public void Init()
- {
- // start the process of setting up the connection. This is the first place that
- // data is written to the server.
- _protocolWriter.Write(new ProtocolInitiation());
- }
-
- public string Username
- {
- get
- {
- return AMQConnection.Username;
- }
- }
-
- public string Password
- {
- get
- {
- return AMQConnection.Password;
- }
- }
-
- ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too.
-
- public ConnectionTuneParameters ConnectionTuneParameters
- {
- get
- {
- return _connectionTuneParameters;
- }
- set
- {
- _connectionTuneParameters = value;
- AMQConnection con = AMQConnection;
- con.SetMaximumChannelCount(value.ChannelMax);
- con.MaximumFrameSize = value.FrameMax;
- }
- }
-
- private ISaslClient _saslClient;
- public ISaslClient SaslClient
- {
- get { return _saslClient; }
- set { _saslClient = value; }
- }
-
- /// <summary>
- /// Callback invoked from the BasicDeliverMethodHandler when a message has been received.
- /// This is invoked on the MINA dispatcher thread.
- /// </summary>
- /// <param name="message">the unprocessed message</param>
- /// <exception cname="AMQException">if this was not expected</exception>
- public void UnprocessedMessageReceived(UnprocessedMessage message)
- {
- _channelId2UnprocessedMsgMap[message.ChannelId] = message;
- }
-
- public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader)
- {
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
- if (msg == null)
- {
- throw new AMQException("Error: received content header without having received a JMSDeliver frame first");
- }
- if (msg.ContentHeader != null)
- {
- throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
- }
- msg.ContentHeader = contentHeader;
- if (contentHeader.BodySize == 0)
- {
- DeliverMessageToAMQSession(channelId, msg);
- }
- }
-
- public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody)
- {
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
- if (msg == null)
- {
- throw new AMQException("Error: received content body without having received a BasicDeliver frame first");
- }
- if (msg.ContentHeader == null)
- {
- _channelId2UnprocessedMsgMap.Remove(channelId);
- throw new AMQException("Error: received content body without having received a ContentHeader frame first");
- }
- try
- {
- msg.ReceiveBody(contentBody);
- }
- catch (UnexpectedBodyReceivedException e)
- {
- _channelId2UnprocessedMsgMap.Remove(channelId);
- throw e;
- }
- if (msg.IsAllBodyDataReceived())
- {
- DeliverMessageToAMQSession(channelId, msg);
- }
- }
-
- /// <summary>
- /// Deliver a message to the appropriate session, removing the unprocessed message
- /// from our map
- /// <param name="channelId">the channel id the message should be delivered to</param>
- /// <param name="msg"> the message</param>
- private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg)
- {
- AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
- channel.MessageReceived(msg);
- _channelId2UnprocessedMsgMap.Remove(channelId);
- }
-
- /// <summary>
- /// Convenience method that writes a frame to the protocol session. Equivalent
- /// to calling getProtocolSession().write().
- /// </summary>
- /// <param name="frame">the frame to write</param>
- public void WriteFrame(IDataBlock frame)
- {
- _protocolWriter.Write(frame);
- }
-
- public void AddSessionByChannel(ushort channelId, AmqChannel channel)
- {
- if (channel == null)
- {
- throw new ArgumentNullException("Attempt to register a null channel");
- }
- _logger.Debug("Add channel with channel id " + channelId);
- _channelId2SessionMap[channelId] = channel;
- }
-
- public void RemoveSessionByChannel(ushort channelId)
- {
- _logger.Debug("Removing session with channelId " + channelId);
- _channelId2SessionMap.Remove(channelId);
- }
-
- /// <summary>
- /// Starts the process of closing a channel
- /// </summary>
- /// <param name="channel" the AmqChannel being closed</param>
- public void CloseSession(AmqChannel channel)
- {
- _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId);
- ushort channelId = channel.ChannelId;
-
- // we need to know when a channel is closing so that we can respond
- // with a channel.close frame when we receive any other type of frame
- // on that channel
- _closingChannels[channelId] = channel;
-
- }
-
- /// <summary>
- /// Called from the ChannelClose handler when a channel close frame is received.
- /// This method decides whether this is a response or an initiation. The latter
- /// case causes the AmqChannel to be closed and an exception to be thrown if
- /// appropriate.
- /// </summary>
- /// <param name="channelId">the id of the channel (session)</param>
- /// <returns>true if the client must respond to the server, i.e. if the server
- /// initiated the channel close, false if the channel close is just the server
- /// responding to the client's earlier request to close the channel.</returns>
- public bool ChannelClosed(ushort channelId, int code, string text)
- {
- // if this is not a response to an earlier request to close the channel
- if (!_closingChannels.ContainsKey(channelId))
- {
- _closingChannels.Remove(channelId);
- AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
- channel.ClosedWithException(new AMQException(_logger, code, text));
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public AMQConnection AMQConnection
- {
- get
- {
- return _connection;
- }
- }
-
- public void CloseProtocolSession()
- {
- _logger.Debug("Closing protocol session");
- _connectionCloser.Close();
- }
-
- internal string GenerateQueueName()
- {
- return "ntmp_" + System.Guid.NewGuid();
- }
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs
deleted file mode 100644
index 2f23a1571d..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Text;
-
-namespace Apache.Qpid.Client.Protocol
-{
- /// <summary>
- /// Default timeout values for the protocol
- /// </summary>
- sealed class DefaultTimeouts
- {
- /// <summary>
- /// Maximum number of milliseconds to wait for a state change
- /// in the protocol's state machine
- /// </summary>
- public const int MaxWaitForState = 30* 1000;
- /// <summary>
- /// Maximum number of milliseconds to wait for a reply
- /// frame when doing synchronous writer to the broker
- /// </summary>
- public const int MaxWaitForSyncWriter = 30 * 1000;
-
- private DefaultTimeouts()
- {
- }
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
deleted file mode 100644
index e3298200c4..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace Apache.Qpid.Client.Protocol
-{
- public interface IConnectionCloser
- {
- void Close();
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
deleted file mode 100644
index 3b53f015f8..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using Apache.Qpid.Client.Protocol.Listener;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Protocol
-{
- public interface IProtocolListener
- {
- void OnMessage(IDataBlock message);
- void OnException(Exception e);
-
- // XXX: .NET way of doing listeners?
- void AddFrameListener(IAMQMethodListener listener);
- void RemoveFrameListener(IAMQMethodListener listener);
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
deleted file mode 100644
index 9cc9f8cee5..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Protocol.Listener
-{
- public abstract class BlockingMethodFrameListener : IAMQMethodListener
- {
- private ManualResetEvent _resetEvent;
-
- public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame);
-
- /// <summary>
- /// This is set if there is an exception thrown from processCommandFrame and the
- /// exception is rethrown to the caller of blockForFrame()
- /// </summary>
- private volatile Exception _error;
-
- protected ushort _channelId;
-
- protected AMQMethodEvent _doneEvt = null;
-
- public BlockingMethodFrameListener(ushort channelId)
- {
- _channelId = channelId;
- _resetEvent = new ManualResetEvent(false);
- }
-
- /// <summary>
- /// This method is called by the MINA dispatching thread. Note that it could
- /// be called before BlockForFrame() has been called.
- /// </summary>
- /// <param name="evt">the frame event</param>
- /// <returns>true if the listener has dealt with this frame</returns>
- /// <exception cref="AMQException"></exception>
- public bool MethodReceived(AMQMethodEvent evt)
- {
- AMQMethodBody method = evt.Method;
-
- try
- {
- bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method);
- if (ready)
- {
- _doneEvt = evt;
- _resetEvent.Set();
- }
-
- return ready;
- }
- catch (AMQException e)
- {
- Error(e);
- // we rethrow the error here, and the code in the frame dispatcher will go round
- // each listener informing them that an exception has been thrown
- throw e;
- }
- }
-
- /// <summary>
- /// This method is called by the thread that wants to wait for a frame.
- /// </summary>
- /// <param name="timeout">Set the number of milliseconds to wait</param>
- public AMQMethodEvent BlockForFrame(int timeout)
- {
- _resetEvent.WaitOne(timeout, true);
- //at this point the event will have been signalled. The error field might or might not be set
- // depending on whether an error occurred
- if (_error != null)
- {
- throw _error;
- }
-
- return _doneEvt;
- }
-
- /// <summary>
- /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this
- /// class to avoid code repetition but again is only called by the MINA dispatcher thread.
- /// </summary>
- /// <param name="e">the exception that caused the error</param>
- public void Error(Exception e)
- {
- // set the error so that the thread that is blocking (in BlockForFrame())
- // can pick up the exception and rethrow to the caller
- _error = e;
- _resetEvent.Set();
- }
- }
-}
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
deleted file mode 100644
index b5450d00f7..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-
-namespace Apache.Qpid.Client.Protocol.Listener
-{
- public interface IAMQMethodListener
- {
- /// <summary>
- /// Invoked when a method frame has been received
- /// <param name="evt">the event</param>
- /// <returns>true if the handler has processed the method frame, false otherwise. Note
- /// that this does not prohibit the method event being delivered to subsequent listeners
- /// but can be used to determine if nobody has dealt with an incoming method frame.</param>
- /// <exception cname="AMQException">if an error has occurred. This exception will be delivered
- /// to all registered listeners using the error() method (see below) allowing them to
- /// perform cleanup if necessary.</exception>
- bool MethodReceived(AMQMethodEvent evt);
-
- /// <summary>
- /// Callback when an error has occurred. Allows listeners to clean up.
- /// </summary>
- /// <param name="e">the exception</param>
- void Error(Exception e);
- }
-}
-
-
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
deleted file mode 100644
index 8cdc1dbba9..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client.Protocol.Listener
-{
- public class SpecificMethodFrameListener : BlockingMethodFrameListener
- {
- private readonly Type _expectedClass;
-
- public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId)
- {
- _expectedClass = expectedClass;
- }
-
- public override bool ProcessMethod(ushort channelId, AMQMethodBody frame)
- {
- return _expectedClass.IsInstanceOfType(frame);
- }
- }
-}
-
-
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
deleted file mode 100644
index 11918f1ea2..0000000000
--- a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using Apache.Qpid.Client.Protocol.Listener;
-using Apache.Qpid.Client.Transport;
-using Apache.Qpid.Framing;
-
-using log4net;
-
-namespace Apache.Qpid.Client.Protocol
-{
- /// <summary>
- /// A convenient interface to writing protocol frames.
- /// </summary>
- public class ProtocolWriter
- {
-
- private ILog _logger = LogManager.GetLogger(typeof(ProtocolWriter));
-
- IProtocolWriter _protocolWriter;
- IProtocolListener _protocolListener;
-
- public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener)
- {
- _protocolWriter = protocolWriter;
- _protocolListener = protocolListener;
- }
-
- public void WriteFrame(IDataBlock frame)
- {
- _protocolWriter.Write(frame);
- }
-
- /// <summary>
- /// Convenience method that writes a frame to the protocol session and waits for
- /// a particular response. Equivalent to calling getProtocolSession().write() then
- /// waiting for the response.
- /// </summary>
- /// <param name="frame">the frame</param>
- /// <param name="listener">the blocking listener. Note the calling thread will block.</param>
- /// <param name="timeout">set the number of milliseconds to wait</param>
- private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener, int timeout)
- {
- try
- {
- _protocolListener.AddFrameListener(listener);
- _protocolWriter.Write(frame);
-
- return listener.BlockForFrame(timeout);
- }
- finally
- {
- _protocolListener.RemoveFrameListener(listener);
- }
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
- }
-
- /// <summary>
- /// Convenience method that writes a frame to the protocol session and waits for
- /// a particular response. Equivalent to calling getProtocolSession().write() then
- /// waiting for the response.
- /// </summary>
- /// <param name="frame">the frame</param>
- /// <param name="responseType">the type of method response</param>
- public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType)
- {
- // TODO: If each frame knew it's response type, then the responseType argument would
- // TODO: not be neccesary.
- return SyncWrite(frame, responseType, DefaultTimeouts.MaxWaitForSyncWriter);
- }
-
- /// <summary>
- /// Convenience method that writes a frame to the protocol session and waits for
- /// a particular response. Equivalent to calling getProtocolSession().write() then
- /// waiting for the response.
- /// </summary>
- /// <param name="frame">the frame</param>
- /// <param name="responseType">the type of method response</param>
- /// <param name="timeout">set the number of milliseconds to wait</param>
- /// <returns>set the number of milliseconds to wait</returns>
- public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType, int timeout)
- {
- return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType), timeout);
- }
- }
-}
-
-