summaryrefslogtreecommitdiff
path: root/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
blob: 279e5ea0bff61d508bd6ad351a7e7c973f4ded09 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.testkit.soak;


import java.util.Random;
import java.util.UUID;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;

/**
 * Test Description
 * ================
 *
 * This test creats x number of sessions, where each session
 * runs in it's own thread. Each session creates a producer
 * and it's own feedback queue.
 *
 * A producer will send n-1 messages, followed by the n-th
 * message which contains "End" in it's payload to signal
 * that this is the last message message in the sequence.
 * The end message has the feedback queue as it's replyTo.
 * It will then listen on the feedback queue waiting for the
 * confirmation and then sleeps for 1000 ms before proceeding
 * with the next n messages.
 *
 * This hand shaking mechanism ensures that all of the
 * messages sent are consumed by some consumer. This prevents
 * the producers from saturating the broker especially when
 * the consumers are slow.
 *
 * All producers send to a single destination
 * If using transactions it's best to use smaller message count
 * as the test only commits after sending all messages in a batch.
 *
 */

public class MultiThreadedProducer extends SimpleProducer
{
    protected final boolean transacted;

    public MultiThreadedProducer()
    {
        super();
        transacted = Boolean.getBoolean("transacted");
    }

    public void test()
    {
        try
        {
            final int msg_count_per_session =  msg_count/session_count;

            for (int i = 0; i < session_count; i++)
            {
                final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
                Thread t = new Thread(new Runnable()
                {
                    private Random gen = new Random();

                    private Message getNextMessage()
                    {
                        if (msg_size == -1)
                        {
                            int index = gen.nextInt(1000);
                            return msgArray[index];
                        }
                        else
                        {
                            return msgArray[0];
                        }
                    }

                    public void run()
                    {
                        try
                        {
                            MessageProducer prod = session.createProducer(dest);
                            // this will ensure that the producer will not overun the consumer.
                            feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID
                                    .randomUUID().toString()), new AMQShortString("control"));

                            MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue);

                            while (true)
                            {
                                for (int i = 0; i < msg_count_per_session; i++)
                                {
                                    Message msg = getNextMessage();
                                    msg.setJMSMessageID("ID:" + UUID.randomUUID());
                                    prod.send(msg);
                                }

                                TextMessage m = session.createTextMessage("End");
                                m.setJMSReplyTo(feedbackQueue);
                                prod.send(m);

                                if (transacted)
                                {
                                    session.commit();
                                }

                                System.out.println(df.format(System.currentTimeMillis()));
                                feedbackConsumer.receive();
                                if (transacted)
                                {
                                    session.commit();
                                }
                                Thread.sleep(1000);
                            }

                        }
                        catch (Exception e)
                        {
                            handleError(e,"Exception in producing message");
                        }

                    }

                });
                t.setName("session-" + i);
                t.start();

            }

        }
        catch (Exception e)
        {
            handleError(e,"Exception while setting up the test");
        }

    }

    public static void main(String[] args)
    {
        MultiThreadedProducer test = new MultiThreadedProducer();
        test.setUp();
        test.test();
    }

}