summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h
blob: f6fa68e481ea59c64b106070e6ae54cfe0b3ccc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H
#define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H

#include "qpid/broker/MessageStore.h"

#include "qpid/Options.h"
#include "qpid/broker/Broker.h"
#include "qpid/legacystore/Cursor.h"
#include "qpid/legacystore/IdDbt.h"
#include "qpid/legacystore/IdSequence.h"
#include "qpid/legacystore/JournalImpl.h"
#include "qpid/legacystore/jrnl/jcfg.h"
#include "qpid/legacystore/PreparedTransaction.h"
#include "qpid/legacystore/TxnCtxt.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/legacystore/Store.h"

#include <string>

#include "db-inc.h"

// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
#include <errno.h>
#define DB_BUFFER_SMALL ENOMEM
#endif

namespace qpid { namespace sys {
class Timer;
}}

namespace mrg {
namespace msgstore {

/**
 * An implementation of the MessageStore interface based on Berkeley DB
 */
class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable
{
  public:
    typedef boost::shared_ptr<Db> db_ptr;
    typedef boost::shared_ptr<DbEnv> dbEnv_ptr;

    struct StoreOptions : public qpid::Options {
        StoreOptions(const std::string& name="Store Options");
        std::string clusterName;
        std::string storeDir;
        u_int16_t numJrnlFiles;
        bool      autoJrnlExpand;
        u_int16_t autoJrnlExpandMaxFiles;
        u_int32_t jrnlFsizePgs;
        bool      truncateFlag;
        u_int32_t wCachePageSizeKib;
        u_int16_t tplNumJrnlFiles;
        u_int32_t tplJrnlFsizePgs;
        u_int32_t tplWCachePageSizeKib;
    };

  protected:
    typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
    typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
    typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;

    typedef LockedMappings::map txn_lock_map;
    typedef boost::ptr_list<PreparedTransaction> txn_list;

    // Structs for Transaction Recover List (TPL) recover state
    struct TplRecoverStruct {
        u_int64_t rid; // rid of TPL record
        bool deq_flag;
        bool commit_flag;
        bool tpc_flag;
        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
    };
    typedef TplRecoverStruct TplRecover;
    typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
    typedef std::map<std::string, TplRecover> TplRecoverMap;
    typedef TplRecoverMap::const_iterator TplRecoverMapCitr;

    typedef std::map<std::string, JournalImpl*> JournalListMap;
    typedef JournalListMap::iterator JournalListMapItr;

    // Default store settings
    static const u_int16_t defNumJrnlFiles = 8;
    static const u_int32_t defJrnlFileSizePgs = 24;
    static const bool      defTruncateFlag = false;
    static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
    static const u_int16_t defTplNumJrnlFiles = 8;
    static const u_int32_t defTplJrnlFileSizePgs = 24;
    static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
    // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line
    static const bool      defAutoJrnlExpand = false;
    static const u_int16_t defAutoJrnlExpandMaxFiles = 0;

    static const std::string storeTopLevelDir;
    static qpid::sys::Duration defJournalGetEventsTimeout;
    static qpid::sys::Duration defJournalFlushTimeout;

    std::list<db_ptr> dbs;
    dbEnv_ptr dbenv;
    db_ptr queueDb;
    db_ptr configDb;
    db_ptr exchangeDb;
    db_ptr mappingDb;
    db_ptr bindingDb;
    db_ptr generalDb;

    // Pointer to Transaction Prepared List (TPL) journal instance
    boost::shared_ptr<TplJournalImpl> tplStorePtr;
    TplRecoverMap tplRecoverMap;
    qpid::sys::Mutex tplInitLock;
    JournalListMap journalList;
    qpid::sys::Mutex journalListLock;
    qpid::sys::Mutex bdbLock;

    IdSequence queueIdSequence;
    IdSequence exchangeIdSequence;
    IdSequence generalIdSequence;
    IdSequence messageIdSequence;
    std::string storeDir;
    u_int16_t numJrnlFiles;
    bool      autoJrnlExpand;
    u_int16_t autoJrnlExpandMaxFiles;
    u_int32_t jrnlFsizeSblks;
    bool      truncateFlag;
    u_int32_t wCachePgSizeSblks;
    u_int16_t wCacheNumPages;
    u_int16_t tplNumJrnlFiles;
    u_int32_t tplJrnlFsizeSblks;
    u_int32_t tplWCachePgSizeSblks;
    u_int16_t tplWCacheNumPages;
    u_int64_t highestRid;
    bool isInit;
    const char* envPath;
    qpid::broker::Broker* broker;

    qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject;
    qpid::management::ManagementAgent* agent;


    // Parameter validation and calculation
    static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
                                          const std::string paramName);
    static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
                                          const std::string paramName,
                                          const u_int32_t wCachePgSizeSblks = 0);
    static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
                                            const std::string paramName,
                                            const u_int16_t jrnlFsizePgs);
    static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
    void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts,
                                  bool& autoJrnlExpand,
                                  u_int16_t& autoJrnlExpandMaxFiles,
                                  const std::string& autoJrnlExpandMaxFilesParamName,
                                  const u_int16_t numJrnlFiles,
                                  const std::string& numJrnlFilesParamName);

    void init();

    void recoverQueues(TxnCtxt& txn,
                       qpid::broker::RecoveryManager& recovery,
                       queue_index& index,
                       txn_list& locked,
                       message_index& messages);
    void recoverMessages(TxnCtxt& txn,
                         qpid::broker::RecoveryManager& recovery,
                         queue_index& index,
                         txn_list& locked,
                         message_index& prepared);
    void recoverMessages(TxnCtxt& txn,
                         qpid::broker::RecoveryManager& recovery,
                         qpid::broker::RecoverableQueue::shared_ptr& queue,
                         txn_list& locked,
                         message_index& prepared,
                         long& rcnt,
                         long& idcnt);
    qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
                                                                  uint64_t mId,
                                                                  unsigned& headerSize);
    void recoverExchanges(TxnCtxt& txn,
                          qpid::broker::RecoveryManager& recovery,
                          exchange_index& index);
    void recoverBindings(TxnCtxt& txn,
                         exchange_index& exchanges,
                         queue_index& queues);
    void recoverGeneral(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery);
    int enqueueMessage(TxnCtxt& txn,
                       IdDbt& msgId,
                       qpid::broker::RecoverableMessage::shared_ptr& msg,
                       queue_index& index,
                       txn_list& locked,
                       message_index& prepared);
    void readTplStore();
    void recoverTplStore();
    void recoverLockedMappings(txn_list& txns);
    TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
    u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
    void store(const qpid::broker::PersistableQueue* queue,
               TxnCtxt* txn,
               const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
               bool newId);
    void async_dequeue(qpid::broker::TransactionContext* ctxt,
                       const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
                       const qpid::broker::PersistableQueue& queue);
    void destroy(db_ptr db,
                 const qpid::broker::Persistable& p);
    bool create(db_ptr db,
                IdSequence& seq,
                const qpid::broker::Persistable& p);
    void completed(TxnCtxt& txn,
                   bool commit);
    void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
    void deleteBinding(const qpid::broker::PersistableExchange& exchange,
                       const qpid::broker::PersistableQueue& queue,
                       const std::string& key);

    void put(db_ptr db,
             DbTxn* txn,
             Dbt& key,
             Dbt& value);
    void open(db_ptr db,
              DbTxn* txn,
              const char* file,
              bool dupKey);
    void closeDbs();

    // journal functions
    void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
    u_int32_t bHash(const std::string str);
    std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
    std::string getJrnlHashDir(const std::string& queueName);
    std::string getJrnlBaseDir();
    std::string getBdbBaseDir();
    std::string getTplBaseDir();
    inline void checkInit() {
        // TODO: change the default dir to ~/.qpidd
        if (!isInit) { init("/tmp"); isInit = true; }
    }
    void chkTplStoreInit();

    // debug aid for printing XIDs that may contain non-printable chars
    static std::string xid2str(const std::string xid) {
        std::ostringstream oss;
        oss << std::hex << std::setfill('0');
        for (unsigned i=0; i<xid.size(); i++) {
            if (isprint(xid[i]))
                oss << xid[i];
            else
                oss << "/" << std::setw(2) << (int)((char)xid[i]);
        }
        return oss.str();
    }

  public:
    typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;

    MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);

    virtual ~MessageStoreImpl();

    bool init(const qpid::Options* options);

    bool init(const std::string& dir,
              u_int16_t jfiles = defNumJrnlFiles,
              u_int32_t jfileSizePgs = defJrnlFileSizePgs,
              const bool truncateFlag = false,
              u_int32_t wCachePageSize = defWCachePageSize,
              u_int16_t tplJfiles = defTplNumJrnlFiles,
              u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
              u_int32_t tplWCachePageSize = defTplWCachePageSize,
              bool      autoJExpand = defAutoJrnlExpand,
              u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);

    void truncateInit(const bool saveStoreContent = false);

    void initManagement ();

    void finalize();

    void create(qpid::broker::PersistableQueue& queue,
                const qpid::framing::FieldTable& args);

    void destroy(qpid::broker::PersistableQueue& queue);

    void create(const qpid::broker::PersistableExchange& queue,
                const qpid::framing::FieldTable& args);

    void destroy(const qpid::broker::PersistableExchange& queue);

    void bind(const qpid::broker::PersistableExchange& exchange,
              const qpid::broker::PersistableQueue& queue,
              const std::string& key,
              const qpid::framing::FieldTable& args);

    void unbind(const qpid::broker::PersistableExchange& exchange,
                const qpid::broker::PersistableQueue& queue,
                const std::string& key,
                const qpid::framing::FieldTable& args);

    void create(const qpid::broker::PersistableConfig& config);

    void destroy(const qpid::broker::PersistableConfig& config);

    void recover(qpid::broker::RecoveryManager& queues);

    void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);

    void destroy(qpid::broker::PersistableMessage& msg);

    void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
                       const std::string& data);

    void loadContent(const qpid::broker::PersistableQueue& queue,
                     const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
                     std::string& data,
                     uint64_t offset,
                     uint32_t length);

    void enqueue(qpid::broker::TransactionContext* ctxt,
                 const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
                 const qpid::broker::PersistableQueue& queue);

    void dequeue(qpid::broker::TransactionContext* ctxt,
                 const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
                 const qpid::broker::PersistableQueue& queue);

    void flush(const qpid::broker::PersistableQueue& queue);

    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);

    void collectPreparedXids(std::set<std::string>& xids);

    std::auto_ptr<qpid::broker::TransactionContext> begin();

    std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);

    void prepare(qpid::broker::TPCTransactionContext& ctxt);

    void localPrepare(TxnCtxt* ctxt);

    void commit(qpid::broker::TransactionContext& ctxt);

    void abort(qpid::broker::TransactionContext& ctxt);

    qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
        { return mgmtObject; }

    inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&)
        { return qpid::management::Manageable::STATUS_OK; }

    std::string getStoreDir() const;

  private:
    void journalDeleted(JournalImpl&);

}; // class MessageStoreImpl

} // namespace msgstore
} // namespace mrg

#endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H