using System;
using System.Threading;
using log4net;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
namespace Apache.Qpid.Client.Tests.interop
{
public class TopicPublisher
{
private static ILog log = LogManager.GetLogger(typeof(TopicPublisher));
/// The default AMQ connection URL to use for tests.
const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
/// Holds the default test timeout for broker communications before tests give up.
const int TIMEOUT = 10000;
/// Holds the routing key for the topic to receive test messages on.
const string CONTROL_ROUTING_KEY = "topic_control";
/// Holds the routing key for the queue to send reports to.
const string RESPONSE_ROUTING_KEY = "response";
/// Holds the number of messages to send in each test run.
private int numMessages;
/// Holds the number of subscribers listening to the messsages.
private int numSubscribers;
/// A monitor used to wait for all reports to arrive back from consumers on.
private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false);
/// Holds the connection to listen on.
private IConnection connection;
/// Holds the channel for all test messages.
private IChannel channel;
/// Holds the producer to send test messages on.
private IMessagePublisher publisher;
///
/// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
/// subscribers.
///
///
/// The broker URL.
/// The number of messages to send in each test.
/// The number of subscribes that are expected to reply with a report.
TopicPublisher(string connectionUri, int numMessages, int numSubscribers)
{
log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages +
", int numSubscribers = " + numSubscribers + "): called");
// Create a connection to the broker.
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
connection = new AMQConnection(connectionInfo);
// Establish a session on the broker.
channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
// Set up a queue to listen for reports on.
string responseQueueName = channel.GenerateUniqueName();
channel.DeclareQueue(responseQueueName, false, true, true);
// Set this listener up to listen for reports on the response queue.
channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY);
//channel.Bind(responseQueueName, "<>", RESPONSE_ROUTING_KEY);
IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
.Create();
consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
// Set up this listener with a producer to send the test messages and report requests on.
publisher = channel.CreatePublisherBuilder()
.WithExchangeName(ExchangeNameDefaults.TOPIC)
.WithRoutingKey(CONTROL_ROUTING_KEY)
.Create();
// Keep the test parameters.
this.numMessages = numMessages;
this.numSubscribers = numSubscribers;
connection.Start();
Console.WriteLine("Sending messages and waiting for reports...");
}
///
/// Start a test subscriber. The broker URL must be specified as the first command line argument.
///
///
/// The command line arguments, broker URL first.
public static void Main(String[] argv)
{
// Create an instance of this publisher with the command line parameters.
TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1);
// Publish the test messages.
publisher.DoTest();
}
///
/// Sends the test messages and waits for all subscribers to reply with a report.
///
public void DoTest()
{
log.Debug("public void DoTest(): called");
// Create a test message to send.
IMessage testMessage = channel.CreateTextMessage("test");
// Send the desired number of test messages.
for (int i = 0; i < numMessages; i++)
{
publisher.Send(testMessage);
}
log.Debug("Sent " + numMessages + " test messages.");
// Send the report request.
IMessage reportRequestMessage = channel.CreateTextMessage("Report request message.");
reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST";
reportRequestMessage.Headers.SetBoolean("BOOLEAN", false);
//reportRequestMessage.Headers.SetByte("BYTE", 5);
reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141);
reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f);
reportRequestMessage.Headers.SetInt("INT", 1);
reportRequestMessage.Headers.SetLong("LONG", 1);
reportRequestMessage.Headers.SetString("STRING", "hello");
reportRequestMessage.Headers.SetShort("SHORT", 2);
publisher.Send(reportRequestMessage);
log.Debug("Sent the report request message, waiting for all replies...");
// Wait until all the reports come in.
allReportsReceivedEvt.WaitOne(TIMEOUT, true);
// Check if all reports were really received or if the timeout occurred.
if (numSubscribers == 0)
{
log.Debug("Got all reports.");
}
else
{
log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + ".");
}
// Send the termination request.
IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message.");
terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST";
publisher.Send(terminationRequestMessage);
log.Debug("Sent the termination request message.");
// Close all message producers and consumers and the connection to the broker.
Shutdown();
}
///
/// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
/// zero, at which time waiting threads are notified of this event.
///
///
/// The received report message.
public void OnMessage(IMessage message)
{
log.Debug("public void OnMessage(IMessage message = " + message + "): called");
// Decrement the count of expected messages and release the wait monitor when this becomes zero.
if (--numSubscribers == 0)
{
log.Debug("Got reports from all subscribers.");
allReportsReceivedEvt.Set();
}
}
/// Stops the message consumers and closes the connection.
private void Shutdown()
{
connection.Stop();
publisher.Dispose();
channel.Dispose();
connection.Dispose();
}
}
}