/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H #define QPID_LEGACYSTORE_MESSAGESTOREIMPL_H #include #include "db-inc.h" #include "qpid/legacystore/Cursor.h" #include "qpid/legacystore/IdDbt.h" #include "qpid/legacystore/IdSequence.h" #include "qpid/legacystore/JournalImpl.h" #include "qpid/legacystore/jrnl/jcfg.h" #include "qpid/legacystore/PreparedTransaction.h" #include "qpid/broker/Broker.h" #include "qpid/broker/MessageStore.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/legacystore/Store.h" #include "qpid/legacystore/TxnCtxt.h" // Assume DB_VERSION_MAJOR == 4 #if (DB_VERSION_MINOR == 2) #include #define DB_BUFFER_SMALL ENOMEM #endif namespace qpid { namespace sys { class Timer; }} namespace mrg { namespace msgstore { /** * An implementation of the MessageStore interface based on Berkeley DB */ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable { public: typedef boost::shared_ptr db_ptr; typedef boost::shared_ptr dbEnv_ptr; struct StoreOptions : public qpid::Options { StoreOptions(const std::string& name="Store Options"); std::string clusterName; std::string storeDir; u_int16_t numJrnlFiles; bool autoJrnlExpand; u_int16_t autoJrnlExpandMaxFiles; u_int32_t jrnlFsizePgs; bool truncateFlag; u_int32_t wCachePageSizeKib; u_int16_t tplNumJrnlFiles; u_int32_t tplJrnlFsizePgs; u_int32_t tplWCachePageSizeKib; }; protected: typedef std::map queue_index; typedef std::map exchange_index; typedef std::map message_index; typedef LockedMappings::map txn_lock_map; typedef boost::ptr_list txn_list; // Structs for Transaction Recover List (TPL) recover state struct TplRecoverStruct { u_int64_t rid; // rid of TPL record bool deq_flag; bool commit_flag; bool tpc_flag; TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag); }; typedef TplRecoverStruct TplRecover; typedef std::pair TplRecoverMapPair; typedef std::map TplRecoverMap; typedef TplRecoverMap::const_iterator TplRecoverMapCitr; typedef std::map JournalListMap; typedef JournalListMap::iterator JournalListMapItr; // Default store settings static const u_int16_t defNumJrnlFiles = 8; static const u_int32_t defJrnlFileSizePgs = 24; static const bool defTruncateFlag = false; static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; static const u_int16_t defTplNumJrnlFiles = 8; static const u_int32_t defTplJrnlFileSizePgs = 24; static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8; // TODO: set defAutoJrnlExpand to true and defAutoJrnlExpandMaxFiles to 16 when auto-expand comes on-line static const bool defAutoJrnlExpand = false; static const u_int16_t defAutoJrnlExpandMaxFiles = 0; static const std::string storeTopLevelDir; static qpid::sys::Duration defJournalGetEventsTimeout; static qpid::sys::Duration defJournalFlushTimeout; std::list dbs; dbEnv_ptr dbenv; db_ptr queueDb; db_ptr configDb; db_ptr exchangeDb; db_ptr mappingDb; db_ptr bindingDb; db_ptr generalDb; // Pointer to Transaction Prepared List (TPL) journal instance boost::shared_ptr tplStorePtr; TplRecoverMap tplRecoverMap; qpid::sys::Mutex tplInitLock; JournalListMap journalList; qpid::sys::Mutex journalListLock; qpid::sys::Mutex bdbLock; IdSequence queueIdSequence; IdSequence exchangeIdSequence; IdSequence generalIdSequence; IdSequence messageIdSequence; std::string storeDir; u_int16_t numJrnlFiles; bool autoJrnlExpand; u_int16_t autoJrnlExpandMaxFiles; u_int32_t jrnlFsizeSblks; bool truncateFlag; u_int32_t wCachePgSizeSblks; u_int16_t wCacheNumPages; u_int16_t tplNumJrnlFiles; u_int32_t tplJrnlFsizeSblks; u_int32_t tplWCachePgSizeSblks; u_int16_t tplWCacheNumPages; u_int64_t highestRid; bool isInit; const char* envPath; qpid::broker::Broker* broker; qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; // Parameter validation and calculation static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName); static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks = 0); static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs); static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib); void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, bool& autoJrnlExpand, u_int16_t& autoJrnlExpandMaxFiles, const std::string& autoJrnlExpandMaxFilesParamName, const u_int16_t numJrnlFiles, const std::string& numJrnlFilesParamName); void init(); void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index, txn_list& locked, message_index& messages); void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index, txn_list& locked, message_index& prepared); void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared, long& rcnt, long& idcnt); qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery, uint64_t mId, unsigned& headerSize); void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index); void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues); void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery); int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, queue_index& index, txn_list& locked, message_index& prepared); void readTplStore(); void recoverTplStore(); void recoverLockedMappings(txn_list& txns); TxnCtxt* check(qpid::broker::TransactionContext* ctxt); u_int64_t msgEncode(std::vector& buff, const boost::intrusive_ptr& message); void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, const boost::intrusive_ptr& message, bool newId); void async_dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const qpid::broker::PersistableQueue& queue); void destroy(db_ptr db, const qpid::broker::Persistable& p); bool create(db_ptr db, IdSequence& seq, const qpid::broker::Persistable& p); void completed(TxnCtxt& txn, bool commit); void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue); void deleteBinding(const qpid::broker::PersistableExchange& exchange, const qpid::broker::PersistableQueue& queue, const std::string& key); void put(db_ptr db, DbTxn* txn, Dbt& key, Dbt& value); void open(db_ptr db, DbTxn* txn, const char* file, bool dupKey); void closeDbs(); // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); u_int32_t bHash(const std::string str); std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/ std::string getJrnlHashDir(const std::string& queueName); std::string getJrnlBaseDir(); std::string getBdbBaseDir(); std::string getTplBaseDir(); inline void checkInit() { // TODO: change the default dir to ~/.qpidd if (!isInit) { init("/tmp"); isInit = true; } } void chkTplStoreInit(); // debug aid for printing XIDs that may contain non-printable chars static std::string xid2str(const std::string xid) { std::ostringstream oss; oss << std::hex << std::setfill('0'); for (unsigned i=0; i shared_ptr; MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0); virtual ~MessageStoreImpl(); bool init(const qpid::Options* options); bool init(const std::string& dir, u_int16_t jfiles = defNumJrnlFiles, u_int32_t jfileSizePgs = defJrnlFileSizePgs, const bool truncateFlag = false, u_int32_t wCachePageSize = defWCachePageSize, u_int16_t tplJfiles = defTplNumJrnlFiles, u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs, u_int32_t tplWCachePageSize = defTplWCachePageSize, bool autoJExpand = defAutoJrnlExpand, u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles); void truncateInit(const bool saveStoreContent = false); void initManagement (); void finalize(); void create(qpid::broker::PersistableQueue& queue, const qpid::framing::FieldTable& args); void destroy(qpid::broker::PersistableQueue& queue); void create(const qpid::broker::PersistableExchange& queue, const qpid::framing::FieldTable& args); void destroy(const qpid::broker::PersistableExchange& queue); void bind(const qpid::broker::PersistableExchange& exchange, const qpid::broker::PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args); void unbind(const qpid::broker::PersistableExchange& exchange, const qpid::broker::PersistableQueue& queue, const std::string& key, const qpid::framing::FieldTable& args); void create(const qpid::broker::PersistableConfig& config); void destroy(const qpid::broker::PersistableConfig& config); void recover(qpid::broker::RecoveryManager& queues); void stage(const boost::intrusive_ptr& msg); void destroy(qpid::broker::PersistableMessage& msg); void appendContent(const boost::intrusive_ptr& msg, const std::string& data); void loadContent(const qpid::broker::PersistableQueue& queue, const boost::intrusive_ptr& msg, std::string& data, uint64_t offset, uint32_t length); void enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const qpid::broker::PersistableQueue& queue); void dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr& msg, const qpid::broker::PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue); void collectPreparedXids(std::set& xids); std::auto_ptr begin(); std::auto_ptr begin(const std::string& xid); void prepare(qpid::broker::TPCTransactionContext& ctxt); void localPrepare(TxnCtxt* ctxt); void commit(qpid::broker::TransactionContext& ctxt); void abort(qpid::broker::TransactionContext& ctxt); qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&) { return qpid::management::Manageable::STATUS_OK; } std::string getStoreDir() const; private: void journalDeleted(JournalImpl&); }; // class MessageStoreImpl } // namespace msgstore } // namespace mrg #endif // ifndef QPID_LEGACYSTORE_MESSAGESTOREIMPL_H