summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
blob: 10f5cd566777f3ea79142229db8dabf29d901382 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.ack;

import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;

import java.util.*;

public class TxAckTest extends TestCase
{
    private Scenario individual;
    private Scenario multiple;
    private Scenario combined;

    protected void setUp() throws Exception
    {
        super.setUp();

        //ack only 5th msg
        individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
        individual.update(5, false);

        //ack all up to and including 5th msg
        multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
        multiple.update(5, true);

        //leave only 8th and 9th unacked
        combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
        combined.update(3, false);
        combined.update(5, true);
        combined.update(7, true);
        combined.update(2, true);//should be ignored
        combined.update(1, false);//should be ignored
        combined.update(10, false);
    }

    public void testPrepare() throws AMQException
    {
        individual.prepare();
        multiple.prepare();
        combined.prepare();
    }

    public void testUndoPrepare() throws AMQException
    {
        individual.undoPrepare();
        multiple.undoPrepare();
        combined.undoPrepare();
    }

    public void testCommit() throws AMQException
    {
        individual.commit();
        multiple.commit();
        combined.commit();
    }

    public static junit.framework.Test suite()
    {
        return new junit.framework.TestSuite(TxAckTest.class);
    }

    private class Scenario
    {
        private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
        private final TxAck _op = new TxAck(_map);
        private final List<Long> _acked;
        private final List<Long> _unacked;
        private StoreContext _storeContext = new StoreContext();

        Scenario(int messageCount, List<Long> acked, List<Long> unacked)
        {
            TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
                                                                          _storeContext, null,
                                                                          new LinkedList<RequiredDeliveryException>(),
                                                                          new HashSet<Long>());
            for(int i = 0; i < messageCount; i++)
            {
                long deliveryTag = i + 1;
                // TODO: fix hardcoded protocol version data
                TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
                                                                                           (byte)0,
                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
                                                                                           null,
                                                                                           false,
                                                                                           false,
                                                                                           null,
                                                                                           0), txnContext);
                _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
            }
            _acked = acked;
            _unacked = unacked;
        }

        void update(long deliverytag, boolean multiple)
        {
            _op.update(deliverytag, multiple);
        }

        private void assertCount(List<Long> tags, int expected)
        {
            for(long tag : tags)
            {
                UnacknowledgedMessage u = _map.get(tag);
                assertTrue("Message not found for tag " + tag, u != null);
                ((TestMessage) u.message).assertCountEquals(expected);
            }
        }

        void prepare() throws AMQException
        {
            _op.consolidate();
            _op.prepare(_storeContext);

            assertCount(_acked, -1);
            assertCount(_unacked, 0);

        }
        void undoPrepare()
        {
            _op.consolidate();
            _op.undoPrepare();

            assertCount(_acked, 1);
            assertCount(_unacked, 0);
        }

        void commit()
        {
            _op.consolidate();
            _op.commit(_storeContext);


            //check acked messages are removed from map
            Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
            keys.retainAll(_acked);
            assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
            //check unacked messages are still in map
            keys = new HashSet<Long>(_unacked);
            keys.removeAll(_map.getDeliveryTags());
            assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
        }
    }

    private class TestMessage extends AMQMessage
    {
        private final long _tag;
        private int _count;

        TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
        {
            super(messageId, publishBody, txnContext);
            _tag = tag;
        }

        public void incrementReference()
        {
            _count++;
        }

        public void decrementReference(StoreContext context)
        {
            _count--;
        }

        void assertCountEquals(int expected)
        {
            assertEquals("Wrong count for message with tag " + _tag, expected, _count);
        }
    }
}