summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/legacystore/jrnl/lpmgr.h
blob: be5c4494ccd94ea737463f7c72f804762f1e4166 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/**
 * \file lpmgr.h
 *
 * Qpid asynchronous store plugin library
 *
 * Class mrg::journal::lpmgr. See class documentation for details.
 *
 * \author Kim van der Riet
 */

#ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H
#define QPID_LEGACYSTORE_JRNL_LPMGR_H

namespace mrg
{
namespace journal
{
    class jcntl;
    class lpmgr;
}
}

#include "qpid/legacystore/jrnl/fcntl.h"
#include <vector>

namespace mrg
{
namespace journal
{

    /**
    * \brief LFID-PFID manager. This class maps the logical file id (lfid) to the physical file id (pfid) so that files
    * may be inserted into the file ring buffer in (nearly) arbitrary logical locations while the physical ids continue
    * to be appended. NOTE: NOT THREAD SAFE.
    *
    * The entire functionality of the LFID-PFID manager is to maintain an array of pointers to fcntl objects which have
    * a one-to-one relationship to the physical %journal files. The logical file id (lfid) is used as an index to the
    * array to read the mapped physical file id (pfid). By altering the order of these pointers within the array, the
    * mapping of logical to physical files may be altered. This can be used to allow for the logical insertion of
    * %journal files into a ring buffer, even though the physical file ids must be appended to those that preceded them.
    *
    * Since the insert() operation uses after-lfid as its position parameter, it is not possible to insert before lfid
    * 0 - i.e. It is only possible to insert after an existing lfid. Consequently, lfid 0 and pfid 0 are always
    * coincident in a %journal. Note, however, that inserting before lfid 0 is logically equivilent to inserting after
    * the last lfid.
    *
    * When one or more files are inserted after a particular lfid, the lfids of the following files are incremented. The
    * pfids of the inserted files follow those of all existing files, thus leading to a lfid-pfid discreppancy (ie no
    * longer a one-to-one mapping):
    *
    * Example: Before insertion, %journal file headers would look as follows:
    * <pre>
    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
    * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
    * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
    * </pre>
    *
    * After insertion of 2 files after lid 2 (marked with *s):
    * <pre>
    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
    * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 |   pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*6*|*7*|
    * lfid --> | 0 | 1 | 2 |*3*|*4*| 5 | 6 | 7 |   lfid --> | 0 | 1 | 2 | 5 | 6 | 7 |*3*|*4*|
    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
    * </pre>
    *
    * The insert() function updates the internal map immediately, but the physical files (which have both the pfid and
    * lfid written into the file header) are only updated as they are overwritten in the normal course of enqueueing
    * and dequeueing messages. If the %journal should fail after insertion but before the files following those inserted
    * are overwritten, then duplicate lfids will be present (though no duplicate pfids are possible). The overwrite
    * indicator (owi) flag and the pfid numbers may be used to resolve the ambiguity and determine the logically earlier
    * lfid in this case.
    *
    * Example: Before insertion, the current active write file being lfid/pfid 2 as determined by the owi flag, %journal
    * file headers would look as follows:
    * <pre>
    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
    * pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
    * lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |           lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |
    *  owi --> | t | t | t | f | f | f |            owi --> | t | t | t | f | f | f |
    *          +---+---+---+---+---+---+                    +---+---+---+---+---+---+
    * </pre>
    *
    * After inserting 2 files after lfid 2 and then 3 (the newly inserted file) - marked with *s:
    * <pre>
    *          Logical view (sorted by lfid):               Physical view (sorted by pfid):
    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
    * pfid --> | 0 | 1 | 2 |*6*|*7*| 3 | 4 | 5 |   pfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
    * lfid --> | 0 | 1 | 2 |*3*|*4*| 3 | 4 | 5 |   lfid --> | 0 | 1 | 2 | 3 | 4 | 5 |*3*|*4*|
    *  owi --> | t | t | t | t | t | f | f | f |    owi --> | t | t | t | f | f | f | t | t |
    *          +---+---+---+---+---+---+---+---+            +---+---+---+---+---+---+---+---+
    * </pre>
    *
    * If a broker failure occurs at this point, then there are two independent tests that may be made to resolve
    * duplicate lfids during recovery in such cases:
    * <ol>
    *   <li>The correct lfid has owi flag that matches that of pfid/lfid 0</li>
    *   <li>The most recently inserted (hence correct) lfid has pfids that are higher than the duplicate that was not
    *       overwritten</li>
    * </ol>
    *
    * NOTE: NOT THREAD SAFE. Provide external thread protection if used in multi-threaded environments.
    */
    class lpmgr
    {
    public:
        /**
        * \brief Function pointer to function that will create a new fcntl object and return its pointer.
        *
        * \param jcp        Pointer to jcntl instance from which journal file details will be obtained.
        * \param lfid       Logical file ID for new fcntl instance.
        * \param pfid       Physical file ID for file associated with new fcntl instance.
        * \param rdp        Pointer to rcvdat instance which conatins recovery information for new fcntl instance when
        *                   recovering an existing file, or null if a new file is to be created.
        */
        typedef fcntl* (new_obj_fn_ptr)(jcntl* const jcp,
                                        const u_int16_t lfid,
                                        const u_int16_t pfid,
                                        const rcvdat* const rdp);

    private:
        bool _ae;                       ///< Auto-expand mode
        u_int16_t _ae_max_jfiles;       ///< Max file count for auto-expansion; 0 = no limit
        std::vector<fcntl*> _fcntl_arr; ///< Array of pointers to fcntl objects

    public:
        lpmgr();
        virtual ~lpmgr();

        /**
        * \brief Initialize from scratch for a known number of %journal files. All lfid values are identical to pfid
        * values (which is normal before any inserts have occurred).
        *
        * \param num_jfiles Number of files to be created, and consequently the number of fcntl objects in array
        *                   _fcntl_arr.
        * \param ae         If true, allows auto-expansion; if false, disables auto-expansion.
        * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
        *                   number of files. However, a zero value disables the limit checks, and allows unlimited
        *                   expansion.
        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
        *                   new files may be created.
        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
        *                   causes a new %journal file to be created).
        */
        void initialize(const u_int16_t num_jfiles,
                        const bool ae,
                        const u_int16_t ae_max_jfiles,
                        jcntl* const jcp,
                        new_obj_fn_ptr fp);

        /**
        * \brief Initialize from a known lfid-pfid map pfid_list (within rcvdat param rd), which is usually obtained
        * from a recover. The index of pfid_list is the logical file id (lfid); the value contained in the vector is
        * the physical file id (pfid).
        *
        * \param rd         Ref to rcvdat struct which contains recovery data and the pfid_list.
        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
        *                   new files may be created.
        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
        *                   causes a new %journal file to be created).
        */
        void recover(const rcvdat& rd,
                     jcntl* const jcp,
                     new_obj_fn_ptr fp);

        /**
        * \brief Insert num_jfiles files after lfid index after_lfid. This causes all lfids after after_lfid to be
        * increased by num_jfiles.
        *
        * Note that it is not possible to insert <i>before</i> lfid 0, and thus lfid 0 should always point to pfid 0.
        * Inserting before lfid 0 is logically equivilent to inserting after the last lfid in a circular buffer.
        *
        * \param after_lfid Lid index after which to insert file(s).
        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
        *                   new files may be created.
        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
        *                   causes a new %journal file to be created).
        * \param num_jfiles The number of files by which to increase.
        */
        void insert(const u_int16_t after_lfid,
                    jcntl* const jcp,
                    new_obj_fn_ptr fp,
                    const u_int16_t num_jfiles = 1);

        /**
        * \brief Clears _fcntl_arr and deletes all fcntl instances.
        */
        void finalize();

        /**
        * \brief Returns true if initialized; false otherwise. After construction, will return false until initialize()
        * is called; thereafter true until finalize() is called, whereupon it will return false again.
        *
        * \return True if initialized; false otherwise.
        */
        inline bool is_init() const { return _fcntl_arr.size() > 0; }

        /**
        * \brief Returns true if auto-expand mode is enabled; false if not.
        *
        * \return True if auto-expand mode is enabled; false if not.
        */
        inline bool is_ae() const { return _ae; }

        /**
        * \brief Sets the auto-expand mode to enabled if ae is true, to disabled otherwise. The value of _ae_max_jfiles
        * must be valid to succeed (i.e. _ae_max_jfiles must be greater than the current number of files or be zero).
        *
        * \param ae         If true will enable auto-expand mode; if false will disable it.
        */
        void set_ae(const bool ae);

        /**
        * \brief Returns the number of %journal files, including any that were appended or inserted since
        * initialization.
        *
        * \return Number of %journal files if initialized; 0 otherwise.
        */
        inline u_int16_t num_jfiles() const { return static_cast<u_int16_t>(_fcntl_arr.size()); }

        /**
        * \brief Returns the maximum number of files allowed for auto-expansion.
        *
        * \return Maximum number of files allowed for auto-expansion. A zero value represents a disabled limit
        *   - i.e. unlimited expansion.
        */
        inline u_int16_t ae_max_jfiles() const { return _ae_max_jfiles; }

        /**
        * \brief Sets the maximum number of files allowed for auto-expansion. A zero value disables the limit.
        *
        * \param ae_max_jfiles The maximum number of files allowed for auto-expansion. Cannot be lower than the current
        *                   number of files. However, a zero value disables the limit checks, and allows unlimited
        *                   expansion.
        */
        void set_ae_max_jfiles(const u_int16_t ae_max_jfiles);

        /**
        * \brief Calculates the number of future files available for auto-expansion.
        *
        * \return The number of future files available for auto-expansion.
        */
        u_int16_t ae_jfiles_rem() const;

        /**
        * \brief Get a pointer to fcntl instance for a given lfid.
        *
        * \return Pointer to fcntl object corresponding to logical file id lfid, or 0 if lfid is out of range
        *   (greater than number of files in use).
        */
        inline fcntl* get_fcntlp(const u_int16_t lfid) const
                { if (lfid >= _fcntl_arr.size()) return 0; return _fcntl_arr[lfid]; }

        // Testing functions
        void get_pfid_list(std::vector<u_int16_t>& pfid_list) const;
        void get_lfid_list(std::vector<u_int16_t>& lfid_list) const;

    protected:

        /**
        * \brief Append num_jfiles files to the end of the logical and file id sequence. This is similar to extending
        * the from-scratch initialization.
        *
        * \param jcp        Pointer to jcntl instance. This is used to find the file path and base filename so that
        *                   new files may be created.
        * \param fp         Pointer to function which creates and returns a pointer to a new fcntl object (and hence
        *                   causes a new %journal file to be created).
        * \param num_jfiles The number of files by which to increase.
        */
        void append(jcntl* const jcp,
                    new_obj_fn_ptr fp,
                    const u_int16_t num_jfiles = 1);

    };

} // namespace journal
} // namespace mrg

#endif // ifndef QPID_LEGACYSTORE_JRNL_LPMGR_H