summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
blob: 8e010ccf077abcb5e0a5a5bf4241f1a0d75296f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.requestreply;

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.*;

import org.apache.log4j.Logger;

import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
import org.apache.qpid.exchange.ExchangeDefaults;

/**
 * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
 * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
 * too.
 *
 * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
 * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
 * temporary queue or the correlation id to correlate the original message to the reply.
 *
 * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
 * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
 * be disabled for real timing tests as writing to the console will slow things down.
 *
 * <p><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Bounce back messages to their reply to destination.
 * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
 * </table>
 *
 * @todo Replace the command line parsing with a neater tool.
 *
 * @todo Make verbose accept a number of messages, only prints to console every X messages.
 */
public class PingPongBouncer implements MessageListener
{
    private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);

    /** The default prefetch size for the message consumer. */
    private static final int PREFETCH = 1;

    /** The default no local flag for the message consumer. */
    private static final boolean NO_LOCAL = true;

    private static final String DEFAULT_DESTINATION_NAME = "ping";

    /** The default exclusive flag for the message consumer. */
    private static final boolean EXCLUSIVE = false;

    /** A convenient formatter to use when time stamping output. */
    protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");

    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
    private boolean _verbose = false;

    /** Determines whether this bounce back client bounces back messages persistently. */
    private boolean _persistent = false;

    private Destination _consumerDestination;

    /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
    private Destination _lastResponseDest;

    /** The producer for sending replies with. */
    private MessageProducer _replyProducer;

    /** The consumer controlSession. */
    private Session _consumerSession;

    /** The producer controlSession. */
    private Session _producerSession;

    /** Holds the connection to the broker. */
    private AMQConnection _connection;

    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
    private boolean _isPubSub = false;

    /**
     * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
     * failover, immediately before committing a transaction.
     */
    protected boolean _failBeforeCommit = false;

    /**
     * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
     * failover, immediate after committing a transaction.
     */
    protected boolean _failAfterCommit = false;

    /**
     * Creates a PingPongBouncer on the specified producer and consumer sessions.
     *
     * @param brokerDetails The addresses of the brokers to connect to.
     * @param username        The broker username.
     * @param password        The broker password.
     * @param virtualpath     The virtual host name within the broker.
     * @param destinationName The name of the queue to receive pings on
     *                        (or root of the queue name where many queues are generated).
     * @param persistent      A flag to indicate that persistent message should be used.
     * @param transacted      A flag to indicate that pings should be sent within transactions.
     * @param selector        A message selector to filter received pings with.
     * @param verbose         A flag to indicate that message timings should be sent to the console.
     *
     * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
     */
    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
                           String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
                           boolean pubsub) throws Exception
    {
        // Create a client id to uniquely identify this client.
        InetAddress address = InetAddress.getLocalHost();
        String clientId = address.getHostName() + System.currentTimeMillis();
        _verbose = verbose;
        _persistent = persistent;
        setPubSub(pubsub);
        // Connect to the broker.
        setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
        _logger.info("Connected with URL:" + getConnection().toURL());

        // Set up the failover notifier.
        getConnection().setConnectionListener(new FailoverNotifier());

        // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
        // command line option.
        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
        _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);

        // Create the queue to listen for message on.
        createConsumerDestination(destinationName);
        MessageConsumer consumer =
            _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);

        // Create a producer for the replies, without a default destination.
        _replyProducer = _producerSession.createProducer(null);
        _replyProducer.setDisableMessageTimestamp(true);
        _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

        // Set this up to listen for messages on the queue.
        consumer.setMessageListener(this);
    }

    /**
     * Starts a stand alone ping-pong client running in verbose mode.
     *
     * @param args
     */
    public static void main(String[] args) throws Exception
    {
        System.out.println("Starting...");

        // Display help on the command line.
        if (args.length == 0)
        {
            _logger.info("Running test with default values...");
            //usage();
            //System.exit(0);
        }

        // Extract all command line parameters.
        Config config = new Config();
        config.setOptions(args);
        String brokerDetails = config.getHost() + ":" + config.getPort();
        String virtualpath = "test";
        String destinationName = config.getDestination();
        if (destinationName == null)
        {
            destinationName = DEFAULT_DESTINATION_NAME;
        }

        String selector = config.getSelector();
        boolean transacted = config.isTransacted();
        boolean persistent = config.usePersistentMessages();
        boolean pubsub = config.isPubSub();
        boolean verbose = true;

        //String selector = null;

        // Instantiate the ping pong client with the command line options and start it running.
        PingPongBouncer pingBouncer =
            new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
                                selector, verbose, pubsub);
        pingBouncer.getConnection().start();

        System.out.println("Waiting...");
    }

    private static void usage()
    {
        System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
                           + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
                           + "-persistent : (true/false). Default is false\n"
                           + "-pubsub     : (true/false). Default is false\n" + "-selector   : selector string\n");
    }

    /**
     * This is a callback method that is notified of all messages for which this has been registered as a message
     * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
     * destination of the message.
     *
     * @param message The message that triggered this callback.
     */
    public void onMessage(Message message)
    {
        try
        {
            String messageCorrelationId = message.getJMSCorrelationID();
            if (_verbose)
            {
                _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
                             + messageCorrelationId);
            }

            // Get the reply to destination from the message and check it is set.
            Destination responseDest = message.getJMSReplyTo();

            if (responseDest == null)
            {
                _logger.debug("Cannot send reply because reply-to destination is null.");

                return;
            }

            // Spew out some timing information if verbose mode is on.
            if (_verbose)
            {
                Long timestamp = message.getLongProperty("timestamp");

                if (timestamp != null)
                {
                    long diff = System.currentTimeMillis() - timestamp;
                    _logger.info("Time to bounce point: " + diff);
                }
            }

            // Correlate the reply to the original.
            message.setJMSCorrelationID(messageCorrelationId);

            // Send the receieved message as the pong reply.
            _replyProducer.send(responseDest, message);

            if (_verbose)
            {
                _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
                             + messageCorrelationId);
            }

            // Commit the transaction if running in transactional mode.
            commitTx(_producerSession);
        }
        catch (JMSException e)
        {
            _logger.debug("There was a JMSException: " + e.getMessage(), e);
        }
    }

    /**
     * Gets the underlying connection that this ping client is running on.
     *
     * @return The underlying connection that this ping client is running on.
     */
    public AMQConnection getConnection()
    {
        return _connection;
    }

    /**
     * Sets the connection that this ping client is using.
     *
     * @param connection The ping connection.
     */
    public void setConnection(AMQConnection connection)
    {
        this._connection = connection;
    }

    /**
     * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
     *
     * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
     */
    public void setPubSub(boolean pubsub)
    {
        _isPubSub = pubsub;
    }

    /**
     * Checks whether this client is a p2p or pub/sub ping client.
     *
     * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
     */
    public boolean isPubSub()
    {
        return _isPubSub;
    }

    /**
     * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
     * a transactional controlSession, this method does nothing.
     *
     * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
     * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
     * after the commit is applied.
     *
     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
     */
    protected void commitTx(Session session) throws JMSException
    {
        if (session.getTransacted())
        {
            try
            {
                if (_failBeforeCommit)
                {
                    _logger.debug("Failing Before Commit");
                    doFailover();
                }

                session.commit();

                if (_failAfterCommit)
                {
                    _logger.debug("Failing After Commit");
                    doFailover();
                }

                _logger.debug("Session Commited.");
            }
            catch (JMSException e)
            {
                _logger.trace("JMSException on commit:" + e.getMessage(), e);

                try
                {
                    session.rollback();
                    _logger.debug("Message rolled back.");
                }
                catch (JMSException jmse)
                {
                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);

                    // Both commit and rollback failed. Throw the rollback exception.
                    throw jmse;
                }
            }
        }
    }

    /**
     * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
     * until the user supplied some input on the terminal.
     *
     * @param broker The name of the broker to terminate.
     */
    protected void doFailover(String broker)
    {
        System.out.println("Kill Broker " + broker + " now.");
        try
        {
            System.in.read();
        }
        catch (IOException e)
        { }

        System.out.println("Continuing.");
    }

    /**
     * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
     * until the user supplied some input on the terminal.
     */
    protected void doFailover()
    {
        System.out.println("Kill Broker now.");
        try
        {
            System.in.read();
        }
        catch (IOException e)
        { }

        System.out.println("Continuing.");

    }

    private void createConsumerDestination(String name)
    {
        if (isPubSub())
        {
            _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
        }
        else
        {
            _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
        }
    }

    /**
     * A connection listener that logs out any failover complete events. Could do more interesting things with this
     * at some point...
     */
    public static class FailoverNotifier implements ConnectionListener
    {
        public void bytesSent(long count)
        { }

        public void bytesReceived(long count)
        { }

        public boolean preFailover(boolean redirect)
        {
            return true;
        }

        public boolean preResubscribe()
        {
            return true;
        }

        public void failoverComplete()
        {
            _logger.info("App got failover complete callback.");
        }
    }
}