summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
blob: 2094aa3b1b038e025d6efef88460f5eeb242a364 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
using Apache.Qpid.Client;

namespace Apache.Qpid.Integration.Tests.testcases
{
    /// <summary>
    /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
    /// verify that it correctly matches or filters out messages based on their headers.
    /// 
    /// Check that a message matching all fields of a headers exchange is passed by the exchange.
    /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
    /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
    /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
    /// </summary>
    ///
    /// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
    ///       in a single test class to make running this as part of an automated test suite possible.</todo>
    /// 
    /// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the 
    ///       callback does demonstrate how to do so.</todo>
    [TestFixture, Category("Integration")]
    public class HeadersExchangeTest : BaseMessagingTestFixture
    {
        private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));

        /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
        private static readonly int TIMEOUT = 2000;

        /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
        private string _exchangeName = "ServiceQ1";

        /// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary>
        private Exception _lastException = null;

        /// <summary> Used to preserve the most recent message from the test consumer. </summary>
        private IMessage _lastMessage = null;

        /// <summary> The test consumer to get messages from the broker with. </summary>
        private IMessageConsumer _consumer;

        private IMessagePublisher _publisher;

        private AutoResetEvent _evt = new AutoResetEvent(false);

        private MessageReceivedDelegate _msgRecDelegate;
        private ExceptionListenerDelegate _exceptionDelegate;

        /// <summary> Holds the test connection. </summary>
        protected IConnection _connection;

        /// <summary> Holds the test channel. </summary>
        protected IChannel _channel;

        [SetUp]
        public override void Init()
        {          
            // Ensure that the base init method is called. It establishes a connection with the broker.
            base.Init();   

            connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);         
            _connection = new AMQConnection(connectionInfo);
            _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300);

            _logger.Info("Starting...");
            _logger.Info("Exchange name is '" + _exchangeName + "'...");

            // Register this to listen for exceptions on the test connection.
            _exceptionDelegate = new ExceptionListenerDelegate(OnException);
            _connection.ExceptionListener += _exceptionDelegate;

            // Declare a new headers exchange with the name of the test service.
            _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);

            // Create a non-durable, temporary (aka auto-delete), exclusive queue.
            string queueName = _channel.GenerateUniqueName();
            _channel.DeclareQueue(queueName, false, true, true);

            // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
            _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());

            // Create a test consumer to consume messages from the test exchange.
            _consumer = _channel.CreateConsumerBuilder(queueName)
                .WithPrefetchLow(100)
                .WithPrefetchHigh(500)
                .WithNoLocal(false) // make sure we get our own messages
                .Create();

            // Register this to listen for messages on the consumer.
            _msgRecDelegate = new MessageReceivedDelegate(OnMessage);
            _consumer.OnMessage += _msgRecDelegate;
            
            // Clear the most recent message and exception.
            _lastException = null;
            _lastMessage = null;

            _publisher = _channel.CreatePublisherBuilder()
                    .WithExchangeName(_exchangeName)
                    .WithMandatory(true)
                    .Create();

            _publisher.DeliveryMode = DeliveryMode.NonPersistent;

            // Start all channel
            _connection.Start();
        }

        /// <summary>
        /// Deregisters the on message delegate before closing the connection.
        /// </summary>
        [TearDown]
        public override void Shutdown()
        {
            _logger.Info("public void Shutdown(): called");

            //_consumer.OnMessage -= _msgRecDelegate;
            //_connection.ExceptionListener -= _exceptionDelegate;

            _connection.Stop();
            _connection.Close();
            _connection.Dispose();

            base.Shutdown();
        }

        /// <summary>
        /// Callback method that is passed any messages received on the test channel.
        /// </summary>
        /// 
        /// <param name="message">The received message.</param>
        public void OnMessage(IMessage message)
        {
            _logger.Debug(string.Format("message.Type = {0}", message.GetType()));
            _logger.Debug("Got message '" + message + "'");

            // Preserve the most recent exception so that test cases can examine it.
            _lastMessage = message;

            // Notify any waiting threads that a message has been received.
            _evt.Set();
        }

        /// <summary>Callback method to handle any exceptions raised by the test connection.</summary>
        /// 
        /// <param name="e">The connection exception.</param>
        public void OnException(Exception e)
        {
            // Preserve the most recent exception in case test cases need to examine it.
            _lastException = e;

            // Notify any waiting threads that an exception event has occurred.
            _evt.Set();
        }

        /// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary>
        [Test]
        public void TestMatchAll()
        {
            IMessage msg = _channel.CreateTextMessage("matches match2=''");
            msg.Headers["match1"] = "foo";
            msg.Headers["match2"] = "";

            // Use the SendTestMessage helper method to verify that the message was sent and received.
            SendTestMessage(msg, true);
        }

        /// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary>
        [Test]
        public void TestMatchEmptyMatchesAnything()
        {
            // Send a test message that matches the headers exchange.
            IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
            msg.Headers["match1"] = "foo";
            msg.Headers["match2"] = "bar";

            // Use the SendTestMessage helper method to verify that the message was sent and received.
            SendTestMessage(msg, true);
        }

        /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
        [Test]
        public void TestMatchOneFails()
        {
            IMessage msg = _channel.CreateTextMessage("not match - only match1");
            msg.Headers["match1"] = "foo";

            // Use the SendTestMessage helper method to verify that the message was sent and not received.
            SendTestMessage(msg, false);
        }

        /// <summary>
        /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by 
        /// the exchange.
        /// </summary>
        [Test]
        public void TestMatchExtraFields()
        {
            IMessage msg = _channel.CreateTextMessage("matches - extra headers");
            msg.Headers["match1"] = "foo";
            msg.Headers["match2"] = "bar";
            msg.Headers["match3"] = "not required";

            // Use the SendTestMessage helper method to verify that the message was sent and received.
            SendTestMessage(msg, true);
        }

        /// <summary>
        /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
        /// depending on whether or not the message should be received by the consumer.
        /// 
        /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
        /// </summary>
        /// 
        /// <param name="msgSend">The message to send.</param>
        /// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param>
        private void SendTestMessage(IMessage msgSend, bool shouldPass)
        {
            _publisher.Send(msgSend);
            _evt.WaitOne(TIMEOUT, true);

            // Check that an exception other than not routable was raised in which case re-raise it as a test error.
            if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
            {
                Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
            }
            // Check that a message was returned if the test is expecting the message to pass.
            else if (shouldPass)
            {
                Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
            }
            // Check that a not routable exception was raised if the test is expecting the message to fail.
            else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
            {
                Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
            }
            // The broker did not respond within the test timeout so fail the test.
            else
            {
                Assert.Fail("The test timed out without a response from the broker.");
            }
        }

        /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
        /// 
        /// <returns> A field table containing test patterns. </returns>
        private FieldTable CreatePatternAsFieldTable()
        {
            FieldTable matchTable = new FieldTable();

            matchTable["match1"] = "foo";
            matchTable["match2"] = "";
            matchTable["x-match"] = "all";

            return matchTable;
        }
    }
}