summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/jrnlPerf/Journal.h
blob: 4803e4ecf6d76dcf0184a9def8f7cf1e8a801671 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/**
 * \file Journal.h
 */

#ifndef tests_storePerftools_jrnlPerf_Journal_h_
#define tests_storePerftools_jrnlPerf_Journal_h_

#ifdef JOURNAL2
#   include "qpid/asyncStore/jrnl2/AioCallback.h"
#   include "qpid/asyncStore/jrnl2/AsyncJournal.h"
#   include "qpid/asyncStore/jrnl2/ScopedLock.h"
#else
#   include "jrnl/aio_callback.hpp"
#   include "jrnl/smutex.hpp"
#endif

#include <stdint.h> // uint16_t, uint32_t

#ifdef JOURNAL2
#   define X_AIO_CALLBACK  qpid::asyncStore::jrnl2::AioCallback
#   define X_AIO_RD_CALLBACK readAioCompleteCallback
#   define X_AIO_WR_CALLBACK writeAioCompleteCallback
#   define X_ASYNC_JOURNAL qpid::asyncStore::jrnl2::AsyncJournal
#   define X_DATA_TOKEN qpid::asyncStore::jrnl2::DataToken
#   define X_SCOPED_MUTEX qpid::asyncStore::jrnl2::ScopedMutex
#else
#   define X_AIO_CALLBACK  mrg::journal::aio_callback
#   define X_AIO_RD_CALLBACK rd_aio_cb
#   define X_AIO_WR_CALLBACK wr_aio_cb
#   define X_ASYNC_JOURNAL mrg::journal::jcntl
#   define X_DATA_TOKEN mrg::journal::data_tok
#   define X_SCOPED_MUTEX mrg::journal::smutex
#endif

#ifndef JOURNAL2
namespace mrg {
namespace journal {
class jcntl;
}} // namespace mrg::journal
namespace qpid {
namespace asyncStore {
namespace jrnl2 {
class AsyncJournal;
}}} // namespace qpid::asyncStore::jrnl2
#endif

namespace tests {
namespace storePerftools {
namespace jrnlPerf {

/**
 * \brief Test journal instance. Each queue to be tested results in one instance of this class.
 *
 * Journal test harness which contains the journal to be tested. Each queue to be tested in the test parameters
 * results in one instance of this class being instantiated, and consequently one set of journals on disk. The
 * journal instance is provided as a pointer to the constructor.
 */
class Journal : public X_AIO_CALLBACK
{
public:
    /**
     * \brief Constructor
     *
     * \param numMsgs Number of messages per thread to be enqueued then dequeued (ie both ways through broker)
     * \param msgSize Size of each message being enqueued
     * \param msgData Pointer to message content (all messages have identical content)
     * \param jrnlPtr Pinter to journal instance which is to be tested
     */
    Journal(const uint32_t numMsgs,
            const uint32_t msgSize,
            const char* msgData,
            X_ASYNC_JOURNAL* const jrnlPtr);

    /**
     * \brief virtual destructor
     */
    virtual ~Journal();

    /**
     * \brief Worker thread enqueue task
     *
     * This function is the worker thread enqueue task entry point. It enqueues _numMsgs onto the journal instance.
     * A data tokens is created for each record, this is the start of the data token life cycle. All possible
     * returns from the journal are handled appropriately. Since the enqueue threads also perform
     * callbacks on completed AIO operations, the data tokens from completed enqueues are placed onto the
     * unprocessed callback list (_unprocCallbackList) for dequeueing by the dequeue worker thread(s).
     *
     * This function must be thread safe.
     */
    void* runEnqueues();

    /**
     * \brief Worker thread dequeue task
     *
     * This function is the worker thread dequeue task entry point. It dequeues messages which are on the
     * unprocessed callback list (_unprocCallbackList).
     *
     * This function must be thread safe.
     */
    void* runDequeues();

    /**
     * \brief Helper function to launch the run() function when starting a thread.
     */
    static void* startEnqueues(void* ptr);

    /**
     * \brief Helper function to launch the run() function when starting a thread.
     */
    static void* startDequeues(void* ptr);

    /**
     * \brief Write callback function. When AIO operations return, this function is called.
     *
     * When AIO operations return, this function will sort the enqueue ops from the rest and place the data tokens
     * of these records onto the unprocessed callback list (_unprocCallbackList) for dequeueing by another thread.
     *
     * Returning dequeue ops have their data tokens destroyed, as this is the end of the life cycle of the data
     * tokens.
     *
     * Required by all subclasses of mrg::journal::aio_callback.
     *
     * \param dataTokenList A vector of data tokens for those messages which have completed their AIO write
     *  operations
     */
    void X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& dataTokenList);

    /**
     * \brief Read callback function. When read AIO operations return, this function is called.
     *
     * Not used in this test, but required by all subclasses of mrg::journal::aio_callback.
     *
     * \param buffPageCtrlBlkIndexList A vector of indices to the buffer page control blocks for completed reads
     */
    void X_AIO_RD_CALLBACK(std::vector<uint16_t>& buffPageCtrlBlkIndexList);

private:
    const uint32_t m_numMsgs;                       ///< Number of messages to be processed by this journal instance
    const uint32_t m_msgSize;                       ///< Size of each message (in bytes)
    const char* m_msgData;                          ///< Pointer to message content to be used for each message.
    X_ASYNC_JOURNAL* const m_jrnlPtr;               ///< Journal instance pointer
    std::vector<X_DATA_TOKEN*> m_unprocCallbacks;   ///< List of unprocessed callbacks to be dequeued
    X_SCOPED_MUTEX m_unprocCallbacksMutex;          ///< Mutex which protects the unprocessed callback queue


};

}}} // namespace tests::storePerftools::jrnlPerf

#endif // tests_storePerftools_jrnlPerf_Journal_h_