summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
blob: 1080092536cf1613e352d07d1fa1705b1b91db42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.testkit.soak;


import java.util.Random;
import java.util.UUID;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;

/**
 * Test Description
 * ================
 * This test will send n-1 messages, followed by the n-th
 * message which contains "End" in it's payload to signal
 * that this is the last message message in the sequence.
 * The end message has the feedback queue as it's replyTo.
 * It will then listen on the feedback queue waiting for the
 * confirmation and then sleeps for 1000 ms before proceeding
 * with the next n messages.
 *
 * This hand shaking mechanism ensures that all of the
 * messages sent are consumed by some consumer. This prevents
 * the producers from saturating the broker especially when
 * the consumers are slow.
 *
 * It creates a producer per session.
 * If session_count is > 1 it will round robin the messages
 * btw the producers.
 *
 * All producers send to a single destination
 *
 */

public class SimpleProducer extends BaseTest
{
    protected Destination feedbackQueue;
    Random gen = new Random();

    public SimpleProducer()
    {
        super();
    }

    protected Message getNextMessage()
    {
        if (msg_size == -1)
        {
            int index = gen.nextInt(1000);
            return msgArray[index];
        }
        else
        {
            return msgArray[0];
        }
    }

    public void test()
    {
        try
        {
            Session[] sessions = new Session[session_count];
            MessageProducer[] prods = new MessageProducer[session_count];

            for (int i = 0; i < session_count; i++)
            {
                sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                prods[i] = sessions[i].createProducer(dest);
            }

            // this will ensure that the producer will not overun the consumer.
            feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"),
                                         new AMQShortString(UUID.randomUUID().toString()),
                                         new AMQShortString("control"));

            MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue);

            int prod_pointer = 0;
            boolean multi_session = session_count > 1 ? true : false;

            while (true)
            {
                for (int i = 0; i < msg_count - 1; i++)
                {
                    Message msg = getNextMessage();
                    msg.setJMSTimestamp(System.currentTimeMillis());
                    prods[prod_pointer].send(msg);
                    if (multi_session)
                    {
                        prod_pointer++;
                        if (prod_pointer == session_count)
                        {
                            prod_pointer = 0;
                        }
                    }
                }

                TextMessage m = sessions[0].createTextMessage("End");
                m.setJMSReplyTo(feedbackQueue);
                prods[prod_pointer].send(m);
                System.out.println(df.format(System.currentTimeMillis()));
                feedbackConsumer.receive();
                Thread.sleep(1000);
            }
        }
        catch (Exception e)
        {
            handleError(e,"Exception while setting up the producer");
        }

    }

    public static void main(String[] args)
    {
        SimpleProducer test = new SimpleProducer();
        test.setUp();
        test.test();
    }

}