summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
blob: 9c8cad4240c01287a2f9d9c9bb661a31931bf035 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.transactionlog;

import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import java.util.List;

public class BaseTransactionLog implements TransactionLog
{
    private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);

    TransactionLog _delegate;
    protected Map<Long, List<AMQQueue>> _idToQueues = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());

    public BaseTransactionLog(TransactionLog delegate)
    {
        _delegate = delegate;
    }

    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
    {
        return _delegate.configure(virtualHost, base, config);
    }

    public void close() throws Exception
    {
        _delegate.close();
    }

    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
    {
        if (queues.size() > 1)
        {
            if (_logger.isInfoEnabled())
            {
                _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
            }
            
            //variable to hold which new queues to enqueue the message on
            ArrayList<AMQQueue> toEnqueueList = null;
            
            List<AMQQueue> enqueuedList = _idToQueues.get(messageId);
            if (enqueuedList != null)
            {
                //There are previous enqueues for this messageId
                //create new empty list to hold additions
                toEnqueueList = new ArrayList<AMQQueue>();
                
                synchronized (enqueuedList)
                {
                    for(AMQQueue queue : queues)
                    {
                        if(!enqueuedList.contains(queue))
                        {
                            //update the old list.
                            enqueuedList.add(queue);
                            //keep track of new enqueues to be made
                            toEnqueueList.add(queue);
                        }
                    }
                }
                
                if(toEnqueueList.isEmpty())
                {
                    //no new queues to enqueue message on
                    return;
                }
            }
            else
            {
                //No existing list, add all provided queues (cloning toEnqueueList in case someone else changes original).
                toEnqueueList = queues;
                _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)toEnqueueList.clone()));
            }

            _delegate.enqueueMessage(context, toEnqueueList, messageId);
        }
        else
        {
            _delegate.enqueueMessage(context, queues, messageId);
        }
    }

    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
    {
        context.dequeueMessage(queue, messageId);

        _delegate.dequeueMessage(context, queue, messageId);

        if (!context.inTransaction())
        {
            processDequeues(context.getDequeueMap());
        }
    }

    /**
     * This should not be called from main broker code.
     * // Perhaps we need a new interface:
     *
     * Broker <->TransactionLog
     * Broker <->BaseTransactionLog<->(Log with removeMessage())
     */
    public void removeMessage(StoreContext context, Long messageId) throws AMQException
    {
        _delegate.removeMessage(context, messageId);
    }

    public void beginTran(StoreContext context) throws AMQException
    {
        context.beginTransaction();
        _delegate.beginTran(context);
    }

    public void commitTran(StoreContext context) throws AMQException
    {

        Map<Long, List<AMQQueue>> messageMap = context.getDequeueMap();

        //For each Message ID that is in the map check
        Set<Long> messageIDs = messageMap.keySet();

        if (_logger.isInfoEnabled())
        {
            _logger.info("Pre-Processing single dequeue of:" + messageIDs);
        }

        Iterator iterator = messageIDs.iterator();

        while (iterator.hasNext())
        {
            Long messageID = (Long) iterator.next();
            //If we don't have a gloabl reference for this message then there
            // is only a single enqueue can check here to see if this is the
            // last reference?
            if (_idToQueues.get(messageID) == null)
            {
                // Add the removal of the message to this transaction
                _delegate.removeMessage(context, messageID);
                // Remove this message ID as we have processed it so we don't
                // reprocess after the main commmit
                iterator.remove();
            }
        }

        //Perform real commit of current data
        _delegate.commitTran(context);

        processDequeues(context.getDequeueMap());

        //Commit the recorded state for this transaction.
        context.commitTransaction();
    }

    public void abortTran(StoreContext context) throws AMQException
    {
        //Abort the recorded state for this transaction.
        context.abortTransaction();

        _delegate.abortTran(context);
    }

    private void processDequeues(Map<Long, List<AMQQueue>> messageMap)
            throws AMQException
    {
        // Check we have dequeues to process then process them
        if (messageMap == null || messageMap.isEmpty())
        {
            return;
        }

        // Process any enqueues to bring our model up to date.
        Set<Long> messageIDs = messageMap.keySet();

        //Create a new Asynchronous Context.
        StoreContext removeContext = new StoreContext(true);

        //Batch Process the Dequeues on the delegate
        _delegate.beginTran(removeContext);
        removeContext.beginTransaction();

        try
        {
            //For each Message ID Decrement the reference for each of the queues it was on.

            if (_logger.isInfoEnabled())
            {
                _logger.info("Processing Dequeue for:" + messageIDs);
            }

            Iterator<Long> messageIDIterator = messageIDs.iterator();

            while(messageIDIterator.hasNext())
            {
                Long messageID = messageIDIterator.next();
                List<AMQQueue> queueList = messageMap.get(messageID);

               //Remove this message from our DequeueMap as we are processing it.
                messageIDIterator.remove();

                // For each of the queues decrement the reference
                for (AMQQueue queue : queueList)
                {
                    List<AMQQueue> enqueuedList = _idToQueues.get(messageID);

                    if (_logger.isInfoEnabled())
                    {
                        _logger.info("Dequeue message:" + messageID + " from :" + queue);
                    }


                    // If we have no mapping then this message was only enqueued on a single queue
                    // This will be the case when we are not in a larger transaction
                    if (enqueuedList == null)
                    {
                        _delegate.removeMessage(removeContext, messageID);
                    }
                    else
                    {
                        //When a message is on more than one queue it is possible that this code section is exectuted
                        // by one thread per enqueue.
                        // It is however, thread safe because there is only removes being performed and so the
                        // last thread that does the remove will see the empty queue and remove the message
                        // At this stage there is nothing that is going to cause this operation to abort. So we don't
                        // need to worry about any potential adds.
                        // The message will no longer be enqueued as that operation has been committed before now so
                        // this is clean up of the data.

                        //Must synchronize here as this list may have been extracted from _idToQueues in many threads
                        // and we must ensure only one of them update the list at a time.
                        synchronized (enqueuedList)
                        {
                            // Update the enqueued list but if the queue is not in the list then we are trying
                            // to dequeue something that is not there anymore, or was never there.
                            if (!enqueuedList.remove(queue))
                            {
                                throw new UnableToDequeueException(messageID, queue);
                            }

                            // If the list is now empty then remove the message
                            if (enqueuedList.isEmpty())
                            {
                                _delegate.removeMessage(removeContext, messageID);
                                //Remove references list
                                _idToQueues.remove(messageID);
                            }
                        }
                    }
                }
            }
            //Commit the removes on the delegate.
            _delegate.commitTran(removeContext);
            // Mark this context as committed.
            removeContext.commitTransaction();
        }
        finally
        {
            if (removeContext.inTransaction())
            {
                _delegate.abortTran(removeContext);
            }
        }
    }

    public boolean inTran(StoreContext context)
    {
        return _delegate.inTran(context);
    }

    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
    {
        _delegate.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
    }

    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
    {
        _delegate.storeMessageMetaData(context, messageId, messageMetaData);
    }

    public boolean isPersistent()
    {
        return _delegate.isPersistent();
    }

    public TransactionLog getDelegate()
    {
        return _delegate;
    }

    private class UnableToDequeueException extends RuntimeException
    {
        Long _messageID;
        AMQQueue _queue;

        public UnableToDequeueException(Long messageID, AMQQueue queue)
        {
            super("Unable to dequeue message(" + messageID + ") from queue " +
                  "(" + queue + ") it is not/nolonger enqueued on it.");
            _messageID = messageID;
            _queue = queue;
        }
    }
}