summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/legacystore/jrnl/data_tok.h
blob: e35f069399590303404243fb286356bd0db82a52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/**
 * \file data_tok.h
 *
 * Qpid asynchronous store plugin library
 *
 * File containing code for class mrg::journal::data_tok (data block token).
 * See class documentation for details.
 *
 * \author Kim van der Riet
 */

#ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H
#define QPID_LEGACYSTORE_JRNL_DATA_TOK_H

namespace mrg
{
namespace journal
{
class data_tok;
}
}

#include <cassert>
#include <cstddef>
#include "qpid/legacystore/jrnl/smutex.h"
#include <pthread.h>
#include <string>
#include <sys/types.h>

namespace mrg
{

namespace journal
{

    /**
    * \class data_tok
    * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
    *     I/O process
    */
    class data_tok
    {
    public:
        // TODO: Fix this, separate write state from operation
        // ie: wstate = NONE, CACHED, PART, SUBM, COMPL
        //     op = ENQUEUE, DEQUEUE, ABORT, COMMIT
        enum write_state
        {
            NONE,       ///< Data block not sent to journal
            ENQ_CACHED, ///< Data block enqueue written to page cache
            ENQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
            ENQ_SUBM,   ///< Data block enqueue submitted to AIO
            ENQ,        ///< Data block enqueue AIO write complete (enqueue complete)
            DEQ_CACHED, ///< Data block dequeue written to page cache
            DEQ_PART,   ///< Data block part-submitted to AIO, waiting for page buffer to free up
            DEQ_SUBM,   ///< Data block dequeue submitted to AIO
            DEQ,        ///< Data block dequeue AIO write complete (dequeue complete)
            ABORT_CACHED,
            ABORT_PART,
            ABORT_SUBM,
            ABORTED,
            COMMIT_CACHED,
            COMMIT_PART,
            COMMIT_SUBM,
            COMMITTED
        };

        enum read_state
        {
            UNREAD,     ///< Data block not read
            READ_PART,  ///< Data block is part-read; waiting for page buffer to fill
            SKIP_PART,  ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
            READ        ///< Data block is fully read
        };

    protected:
        static smutex _mutex;
        static u_int64_t _cnt;
        u_int64_t   _icnt;
        write_state _wstate;        ///< Enqueued / dequeued state of data
        read_state  _rstate;        ///< Read state of data
        std::size_t _dsize;         ///< Data size in bytes
        u_int32_t   _dblks_written; ///< Data blocks read/written
        u_int32_t   _dblks_read;    ///< Data blocks read/written
        u_int32_t   _pg_cnt;        ///< Page counter - incr for each page containing part of data
        u_int16_t   _fid;           ///< FID containing header of enqueue record
        u_int64_t   _rid;           ///< RID of data set by enqueue operation
        std::string _xid;           ///< XID set by enqueue operation
        u_int64_t   _dequeue_rid;   ///< RID of data set by dequeue operation
        bool        _external_rid;  ///< Flag to indicate external setting of rid

    public:
        data_tok();
        virtual ~data_tok();

        inline u_int64_t id() const { return _icnt; }
        inline write_state wstate() const { return _wstate; }
        const char* wstate_str() const;
        static const char* wstate_str(write_state wstate);
        inline read_state rstate() const { return _rstate; }
        const char* rstate_str() const;
        static const char* rstate_str(read_state rstate);
        inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
        inline bool is_enqueued() const { return _wstate == ENQ; }
        inline bool is_readable() const { return _wstate == ENQ; }
        inline bool is_read() const { return _rstate == READ; }
        inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
        inline void set_wstate(const write_state wstate) { _wstate = wstate; }
        void set_rstate(const read_state rstate);
        inline std::size_t dsize() const { return _dsize; }
        inline void set_dsize(std::size_t dsize) { _dsize = dsize; }

        inline u_int32_t dblocks_written() const { return _dblks_written; }
        inline void incr_dblocks_written(u_int32_t dblks_written)
                { _dblks_written += dblks_written; }
        inline void set_dblocks_written(u_int32_t dblks_written) { _dblks_written = dblks_written; }

        inline u_int32_t dblocks_read() const { return _dblks_read; }
        inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read; }
        inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }

        inline u_int32_t pg_cnt() const { return _pg_cnt; }
        inline u_int32_t incr_pg_cnt() { return ++_pg_cnt; }
        inline u_int32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }

        inline u_int16_t fid() const { return _fid; }
        inline void set_fid(const u_int16_t fid) { _fid = fid; }
        inline u_int64_t rid() const { return _rid; }
        inline void set_rid(const u_int64_t rid) { _rid = rid; }
        inline u_int64_t dequeue_rid() const {return _dequeue_rid; }
        inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
        inline bool external_rid() const { return _external_rid; }
        inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }

        inline bool has_xid() const { return !_xid.empty(); }
        inline const std::string& xid() const { return _xid; }
        inline void clear_xid() { _xid.clear(); }
        inline void set_xid(const std::string& xid) { _xid.assign(xid); }
        inline void set_xid(const void* xidp, const std::size_t xid_len)
                { _xid.assign((const char*)xidp, xid_len); }

        void reset();

        // debug aid
        std::string status_str() const;
    };

} // namespace journal
} // namespace mrg

#endif // ifndef QPID_LEGACYSTORE_JRNL_DATA_TOK_H