diff options
24 files changed, 428 insertions, 148 deletions
diff --git a/mysql-test/ndb/ndb_config_2_node.ini b/mysql-test/ndb/ndb_config_2_node.ini index 302998bc79e..c0065bdb25a 100644 --- a/mysql-test/ndb/ndb_config_2_node.ini +++ b/mysql-test/ndb/ndb_config_2_node.ini @@ -13,6 +13,7 @@ TimeBetweenGlobalCheckpoints= 500 NoOfFragmentLogFiles= 4 FragmentLogFileSize=12M DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory +ODirect= 1 # the following parametes just function as a small regression # test that the parameter exists InitialNoOfOpenFiles= 27 diff --git a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h index ed34a372db6..d0bd8be16a3 100644 --- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h +++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h @@ -82,6 +82,8 @@ #define CFG_DB_BACKUP_WRITE_SIZE 136 #define CFG_DB_BACKUP_MAX_WRITE_SIZE 139 +#define CFG_DB_WATCHDOG_INTERVAL_INITIAL 141 + #define CFG_LOG_DESTINATION 147 #define CFG_DB_DISCLESS 148 @@ -114,6 +116,8 @@ #define CFG_DB_MEMREPORT_FREQUENCY 166 +#define CFG_DB_O_DIRECT 168 + #define CFG_DB_SGA 198 /* super pool mem */ #define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */ diff --git a/storage/ndb/include/ndb_global.h.in b/storage/ndb/include/ndb_global.h.in index 60d32f62ee3..c3ea909ba2e 100644 --- a/storage/ndb/include/ndb_global.h.in +++ b/storage/ndb/include/ndb_global.h.in @@ -146,4 +146,6 @@ extern "C" { #define MAX(x,y) (((x)>(y))?(x):(y)) #endif +#define NDB_O_DIRECT_WRITE_ALIGNMENT 512 + #endif diff --git a/storage/ndb/include/portlib/NdbTick.h b/storage/ndb/include/portlib/NdbTick.h index 59f580de38e..70c36fdfd1e 100644 --- a/storage/ndb/include/portlib/NdbTick.h +++ b/storage/ndb/include/portlib/NdbTick.h @@ -37,9 +37,6 @@ NDB_TICKS NdbTick_CurrentMillisecond(void); */ int NdbTick_CurrentMicrosecond(NDB_TICKS * secs, Uint32 * micros); - /*#define TIME_MEASUREMENT*/ -#ifdef TIME_MEASUREMENT - struct MicroSecondTimer { NDB_TICKS seconds; NDB_TICKS micro_seconds; @@ -54,7 +51,6 @@ struct MicroSecondTimer { NDB_TICKS NdbTick_getMicrosPassed(struct MicroSecondTimer start, struct MicroSecondTimer stop); int NdbTick_getMicroTimer(struct MicroSecondTimer* time_now); -#endif #ifdef __cplusplus } diff --git a/storage/ndb/src/common/portlib/NdbTick.c b/storage/ndb/src/common/portlib/NdbTick.c index eff6b28b7eb..f69c42c0ca0 100644 --- a/storage/ndb/src/common/portlib/NdbTick.c +++ b/storage/ndb/src/common/portlib/NdbTick.c @@ -15,7 +15,7 @@ #include <ndb_global.h> -#include "NdbTick.h" +#include <NdbTick.h> #define NANOSEC_PER_SEC 1000000000 #define MICROSEC_PER_SEC 1000000 @@ -71,7 +71,6 @@ NdbTick_CurrentMicrosecond(NDB_TICKS * secs, Uint32 * micros){ } #endif -#ifdef TIME_MEASUREMENT int NdbTick_getMicroTimer(struct MicroSecondTimer* input_timer) { @@ -102,4 +101,3 @@ NdbTick_getMicrosPassed(struct MicroSecondTimer start, } return ret_value; } -#endif diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp index 57082eaccc8..645eb590ae3 100644 --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp @@ -2761,6 +2761,8 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr) c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr); filePtr.p->m_flags |= BackupFile::BF_OPENING; + if (c_defaults.m_o_direct) + req->fileFlags |= FsOpenReq::OM_DIRECT; req->userPointer = filePtr.i; FsOpenReq::setVersion(req->fileNumber, 2); FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA); @@ -3735,12 +3737,31 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo) } bool -Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo) +Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record) { Uint32 * tmp; const Uint32 footSz = sizeof(BackupFormat::DataFile::FragmentFooter) >> 2; + Uint32 sz = footSz + 1; - if(dataBuffer.getWritePtr(&tmp, footSz + 1)) { + if (fill_record) + { + Uint32 * new_tmp; + if (!dataBuffer.getWritePtr(&tmp, sz)) + return false; + new_tmp = tmp + sz; + + if ((UintPtr)new_tmp & (sizeof(Page32)-1)) + { + /* padding is needed to get full write */ + new_tmp += 2 /* to fit empty header minimum 2 words*/; + new_tmp = (Uint32 *)(((UintPtr)new_tmp + sizeof(Page32)-1) & + ~(UintPtr)(sizeof(Page32)-1)); + /* new write sz */ + sz = new_tmp - tmp; + } + } + + if(dataBuffer.getWritePtr(&tmp, sz)) { jam(); * tmp = 0; // Finish record stream tmp++; @@ -3752,7 +3773,17 @@ Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo) foot->FragmentNo = htonl(fragNo); foot->NoOfRecords = htonl(noOfRecords); foot->Checksum = htonl(0); - dataBuffer.updateWritePtr(footSz + 1); + + if (sz != footSz + 1) + { + tmp += footSz; + memset(tmp, 0, (sz - footSz - 1) * 4); + *tmp = htonl(BackupFormat::EMPTY_ENTRY); + tmp++; + *tmp = htonl(sz - footSz - 1); + } + + dataBuffer.updateWritePtr(sz); return true; }//if return false; @@ -3854,8 +3885,13 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr) return; }//if + BackupRecordPtr ptr LINT_SET_PTR; + c_backupPool.getPtr(ptr, filePtr.p->backupPtr); + OperationRecord & op = filePtr.p->operation; - if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo)) { + if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo, + c_defaults.m_o_direct)) + { jam(); signal->theData[0] = BackupContinueB::BUFFER_FULL_FRAG_COMPLETE; signal->theData[1] = filePtr.i; @@ -3865,9 +3901,6 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr) filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD; - BackupRecordPtr ptr LINT_SET_PTR; - c_backupPool.getPtr(ptr, filePtr.p->backupPtr); - if (ptr.p->is_lcp()) { ptr.p->slaveState.setState(STOPPING); @@ -4905,6 +4938,8 @@ Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr) FsOpenReq::OM_CREATE | FsOpenReq::OM_APPEND | FsOpenReq::OM_AUTOSYNC; + if (c_defaults.m_o_direct) + req->fileFlags |= FsOpenReq::OM_DIRECT; FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF); req->auto_sync_size = c_defaults.m_disk_synch_size; diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.hpp b/storage/ndb/src/kernel/blocks/backup/Backup.hpp index 32f2e14ac92..3fd9b2967fd 100644 --- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp +++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp @@ -240,7 +240,7 @@ public: * Once per fragment */ bool newFragment(Uint32 tableId, Uint32 fragNo); - bool fragComplete(Uint32 tableId, Uint32 fragNo); + bool fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record); /** * Once per scan frag (next) req/conf @@ -534,6 +534,7 @@ public: Uint32 m_disk_write_speed; Uint32 m_disk_synch_size; Uint32 m_diskless; + Uint32 m_o_direct; }; /** diff --git a/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp b/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp index ace9dfe5c79..20f8f6650be 100644 --- a/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp +++ b/storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp @@ -32,7 +32,8 @@ struct BackupFormat { TABLE_LIST = 4, TABLE_DESCRIPTION = 5, GCP_ENTRY = 6, - FRAGMENT_INFO = 7 + FRAGMENT_INFO = 7, + EMPTY_ENTRY = 8 }; struct FileHeader { @@ -93,6 +94,13 @@ struct BackupFormat { Uint32 NoOfRecords; Uint32 Checksum; }; + + /* optional padding for O_DIRECT */ + struct EmptyEntry { + Uint32 SectionType; + Uint32 SectionLength; + /* not used data */ + }; }; /** diff --git a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp index 4faa02e494f..2cd2a8a2bee 100644 --- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp +++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp @@ -148,10 +148,13 @@ Backup::execREAD_CONFIG_REQ(Signal* signal) c_defaults.m_disk_write_speed = 10 * (1024 * 1024); c_defaults.m_disk_write_speed_sr = 100 * (1024 * 1024); c_defaults.m_disk_synch_size = 4 * (1024 * 1024); - + c_defaults.m_o_direct = true; + Uint32 noBackups = 0, noTables = 0, noAttribs = 0, noFrags = 0; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_defaults.m_diskless)); + ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT, + &c_defaults.m_o_direct); ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED_SR, &c_defaults.m_disk_write_speed_sr); ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED, @@ -204,7 +207,7 @@ Backup::execREAD_CONFIG_REQ(Signal* signal) / sizeof(Page32); // We need to allocate an additional of 2 pages. 1 page because of a bug in // ArrayPool and another one for DICTTAINFO. - c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2); + c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2, true); { // Init all tables SLList<Table> tables(c_tablePool); diff --git a/storage/ndb/src/kernel/blocks/backup/FsBuffer.hpp b/storage/ndb/src/kernel/blocks/backup/FsBuffer.hpp index d26f36ccf40..bb0bbd6d770 100644 --- a/storage/ndb/src/kernel/blocks/backup/FsBuffer.hpp +++ b/storage/ndb/src/kernel/blocks/backup/FsBuffer.hpp @@ -270,8 +270,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){ * ptr = &Tp[Tr]; - DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d", - Tr, Tw, Ts, Tm, sz1, * sz)); + DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d", + Tr, Tmw, Ts, Tm, sz1, * sz)); return true; } @@ -279,8 +279,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){ if(!m_eof){ * _eof = false; - DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> false", - Tr, Tw, Ts, Tm, sz1)); + DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> false", + Tr, Tmw, Ts, Tm, sz1)); return false; } @@ -289,8 +289,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){ * _eof = true; * ptr = &Tp[Tr]; - DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d eof", - Tr, Tw, Ts, Tm, sz1, * sz)); + DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d eof", + Tr, Tmw, Ts, Tm, sz1, * sz)); return false; } @@ -316,13 +316,13 @@ FsBuffer::getWritePtr(Uint32 ** ptr, Uint32 sz){ if(sz1 > sz){ // Note at least 1 word of slack * ptr = &Tp[Tw]; - DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> true", - sz, Tr, Tw, Ts, sz1)); + DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> true", + sz, Tw, sz1)); return true; } - DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> false", - sz, Tr, Tw, Ts, sz1)); + DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> false", + sz, Tw, sz1)); return false; } @@ -339,11 +339,15 @@ FsBuffer::updateWritePtr(Uint32 sz){ m_free -= sz; if(Tnew < Ts){ m_writeIndex = Tnew; + DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d", + sz, m_writeIndex)); return; } memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2); m_writeIndex = Tnew - Ts; + DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d", + sz, m_writeIndex)); } inline diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 18fc7417623..4e4c553ea6c 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -114,9 +114,6 @@ class Dbtup; /* ------------------------------------------------------------------------- */ /* VARIOUS CONSTANTS USED AS FLAGS TO THE FILE MANAGER. */ /* ------------------------------------------------------------------------- */ -#define ZOPEN_READ 0 -#define ZOPEN_WRITE 1 -#define ZOPEN_READ_WRITE 2 #define ZVAR_NO_LOG_PAGE_WORD 1 #define ZLIST_OF_PAIRS 0 #define ZLIST_OF_PAIRS_SYNCH 16 @@ -2687,6 +2684,7 @@ private: UintR clfoFileSize; LogPageRecord *logPageRecord; + void *logPageRecordUnaligned; LogPageRecordPtr logPagePtr; UintR cfirstfreeLogPage; UintR clogPageFileSize; @@ -2890,6 +2888,7 @@ private: UintR ctransidHash[1024]; Uint32 c_diskless; + Uint32 c_o_direct; Uint32 c_error_insert_table_id; public: diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp index 05ea2047fc0..d4d026985fd 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp @@ -49,6 +49,7 @@ void Dblqh::initData() logFileRecord = 0; logFileOperationRecord = 0; logPageRecord = 0; + logPageRecordUnaligned= 0; pageRefRecord = 0; tablerec = 0; tcConnectionrec = 0; @@ -107,10 +108,13 @@ void Dblqh::initRecords() sizeof(LogFileOperationRecord), clfoFileSize); - logPageRecord = (LogPageRecord*)allocRecord("LogPageRecord", - sizeof(LogPageRecord), - clogPageFileSize, - false); + logPageRecord = + (LogPageRecord*)allocRecordAligned("LogPageRecord", + sizeof(LogPageRecord), + clogPageFileSize, + &logPageRecordUnaligned, + NDB_O_DIRECT_WRITE_ALIGNMENT, + false); pageRefRecord = (PageRefRecord*)allocRecord("PageRefRecord", sizeof(PageRefRecord), @@ -381,7 +385,7 @@ Dblqh::~Dblqh() sizeof(LogFileOperationRecord), clfoFileSize); - deallocRecord((void**)&logPageRecord, + deallocRecord((void**)&logPageRecordUnaligned, "LogPageRecord", sizeof(LogPageRecord), clogPageFileSize); diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 4a7d38c293c..073af6eacb1 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -1016,6 +1016,8 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal) cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_diskless)); + c_o_direct = true; + ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT, &c_o_direct); Uint32 tmp= 0; ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &tmp)); @@ -13292,7 +13294,9 @@ void Dblqh::openFileRw(Signal* signal, LogFileRecordPtr olfLogFilePtr) signal->theData[3] = olfLogFilePtr.p->fileName[1]; signal->theData[4] = olfLogFilePtr.p->fileName[2]; signal->theData[5] = olfLogFilePtr.p->fileName[3]; - signal->theData[6] = ZOPEN_READ_WRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; + signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; + if (c_o_direct) + signal->theData[6] |= FsOpenReq::OM_DIRECT; req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); Uint64 sz = clogFileSize; sz *= 1024; sz *= 1024; @@ -13316,7 +13320,9 @@ void Dblqh::openLogfileInit(Signal* signal) signal->theData[3] = logFilePtr.p->fileName[1]; signal->theData[4] = logFilePtr.p->fileName[2]; signal->theData[5] = logFilePtr.p->fileName[3]; - signal->theData[6] = 0x302 | FsOpenReq::OM_AUTOSYNC; + signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC; + if (c_o_direct) + signal->theData[6] |= FsOpenReq::OM_DIRECT; req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA); }//Dblqh::openLogfileInit() @@ -13352,7 +13358,9 @@ void Dblqh::openNextLogfile(Signal* signal) signal->theData[3] = onlLogFilePtr.p->fileName[1]; signal->theData[4] = onlLogFilePtr.p->fileName[2]; signal->theData[5] = onlLogFilePtr.p->fileName[3]; - signal->theData[6] = 2 | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; + signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; + if (c_o_direct) + signal->theData[6] |= FsOpenReq::OM_DIRECT; req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); Uint64 sz = clogFileSize; sz *= 1024; sz *= 1024; diff --git a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp index 6fc88c5061f..d3924a586b0 100644 --- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp +++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp @@ -277,6 +277,14 @@ void Ndbcntr::execSTTOR(Signal* signal) break; case ZSTART_PHASE_1: jam(); + { + Uint32 db_watchdog_interval = 0; + const ndb_mgm_configuration_iterator * p = + m_ctx.m_config.getOwnConfigIterator(); + ndb_mgm_get_int_parameter(p, CFG_DB_WATCHDOG_INTERVAL, &db_watchdog_interval); + ndbrequire(db_watchdog_interval); + update_watch_dog_timer(db_watchdog_interval); + } startPhase1Lab(signal); break; case ZSTART_PHASE_2: diff --git a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp index 5f93ee31bc7..cf18bf34040 100644 --- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp +++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp @@ -163,7 +163,12 @@ AsyncFile::run() theStartFlag = true; // Create write buffer for bigger writes theWriteBufferSize = WRITEBUFFERSIZE; - theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize); + theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize + + NDB_O_DIRECT_WRITE_ALIGNMENT-1); + theWriteBuffer = (char *) + (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) & + ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1)); + NdbMutex_Unlock(theStartMutexPtr); NdbCondition_Signal(theStartConditionPtr); @@ -247,6 +252,78 @@ AsyncFile::run() static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1]; #endif +int +AsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode) +{ + assert(new_flags & (O_CREAT | O_TRUNC)); +#ifdef O_DIRECT + int ret; + char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1)); + while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) && + (errno == EINTR)); + if (ret == -1) + { + new_flags &= ~O_DIRECT; + ndbout_c("%s Failed to write using O_DIRECT, disabling", + theFileName.c_str()); + } + + close(theFd); + theFd = ::open(theFileName.c_str(), new_flags, mode); + if (theFd == -1) + return errno; +#endif + + return 0; +} + +int +AsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode) +{ +#ifdef O_DIRECT + int ret; + char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1)); + while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) && + (errno == EINTR)); + if (ret == -1) + { + ndbout_c("%s Failed to read using O_DIRECT, disabling", + theFileName.c_str()); + goto reopen; + } + + if(lseek(theFd, 0, SEEK_SET) != 0) + { + return errno; + } + + if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0) + { + struct stat buf; + if ((fstat(theFd, &buf) == -1)) + { + return errno; + } + else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0) + { + ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT", + theFileName.c_str(), GLOBAL_PAGE_SIZE); + goto reopen; + } + } + + return 0; + +reopen: + close(theFd); + new_flags &= ~O_DIRECT; + theFd = ::open(theFileName.c_str(), new_flags, mode); + if (theFd == -1) + return errno; +#endif + return 0; +} + void AsyncFile::openReq(Request* request) { m_auto_sync_freq = 0; @@ -312,7 +389,7 @@ void AsyncFile::openReq(Request* request) } #else Uint32 flags = request->par.open.flags; - Uint32 new_flags = 0; + int new_flags = 0; // Convert file open flags from Solaris to Liux if (flags & FsOpenReq::OM_CREATE) @@ -343,10 +420,6 @@ void AsyncFile::openReq(Request* request) { new_flags |= O_DIRECT; } -#elif defined O_SYNC - { - flags |= FsOpenReq::OM_SYNC; - } #endif if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT)) @@ -355,15 +428,19 @@ void AsyncFile::openReq(Request* request) new_flags |= O_SYNC; #endif } - + + const char * rw = ""; switch(flags & 0x3){ case FsOpenReq::OM_READONLY: + rw = "r"; new_flags |= O_RDONLY; break; case FsOpenReq::OM_WRITEONLY: + rw = "w"; new_flags |= O_WRONLY; break; case FsOpenReq::OM_READWRITE: + rw = "rw"; new_flags |= O_RDWR; break; default: @@ -404,11 +481,6 @@ no_odirect: if (new_flags & O_DIRECT) { new_flags &= ~O_DIRECT; - flags |= FsOpenReq::OM_SYNC; -#ifdef O_SYNC - if (! (flags & FsOpenReq::OM_INIT)) - new_flags |= O_SYNC; -#endif goto no_odirect; } #endif @@ -421,11 +493,6 @@ no_odirect: else if (new_flags & O_DIRECT) { new_flags &= ~O_DIRECT; - flags |= FsOpenReq::OM_SYNC; -#ifdef O_SYNC - if (! (flags & FsOpenReq::OM_INIT)) - new_flags |= O_SYNC; -#endif goto no_odirect; } #endif @@ -512,7 +579,6 @@ no_odirect: { ndbout_c("error on first write(%d), disable O_DIRECT", err); new_flags &= ~O_DIRECT; - flags |= FsOpenReq::OM_SYNC; close(theFd); theFd = ::open(theFileName.c_str(), new_flags, mode); if (theFd != -1) @@ -532,26 +598,32 @@ no_odirect: else if (flags & FsOpenReq::OM_DIRECT) { #ifdef O_DIRECT - do { - int ret; - char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1)); - while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) && (errno == EINTR)); - if (ret == -1) - { - ndbout_c("%s Failed to read using O_DIRECT, disabling", theFileName.c_str()); - flags |= FsOpenReq::OM_SYNC; - flags |= FsOpenReq::OM_INIT; - break; - } - if(lseek(theFd, 0, SEEK_SET) != 0) - { - request->error = errno; - return; - } - } while (0); + if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE)) + { + request->error = check_odirect_write(flags, new_flags, mode); + } + else + { + request->error = check_odirect_read(flags, new_flags, mode); + } + + if (request->error) + return; #endif } - +#ifdef VM_TRACE + if (flags & FsOpenReq::OM_DIRECT) + { +#ifdef O_DIRECT + ndbout_c("%s %s O_DIRECT: %d", + theFileName.c_str(), rw, + !!(new_flags & O_DIRECT)); +#else + ndbout_c("%s %s O_DIRECT: 0", + theFileName.c_str(), rw); +#endif + } +#endif if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT)) { #ifdef O_SYNC @@ -562,6 +634,10 @@ no_odirect: new_flags &= ~(O_CREAT | O_TRUNC); new_flags |= O_SYNC; theFd = ::open(theFileName.c_str(), new_flags, mode); + if (theFd == -1) + { + request->error = errno; + } #endif } #endif @@ -1079,7 +1155,8 @@ AsyncFile::rmrfReq(Request * request, char * path, bool removePath){ void AsyncFile::endReq() { // Thread is ended with return - if (theWriteBuffer) ndbd_free(theWriteBuffer, theWriteBufferSize); + if (theWriteBufferUnaligned) + ndbd_free(theWriteBufferUnaligned, theWriteBufferSize); } diff --git a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp index e8f2deb016c..64567dd2bb8 100644 --- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp +++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp @@ -232,9 +232,13 @@ private: bool theStartFlag; int theWriteBufferSize; char* theWriteBuffer; + void* theWriteBufferUnaligned; size_t m_write_wo_sync; // Writes wo/ sync size_t m_auto_sync_freq; // Auto sync freq in bytes + + int check_odirect_read(Uint32 flags, int&new_flags, int mode); + int check_odirect_write(Uint32 flags, int&new_flags, int mode); public: SimulatedBlock& m_fs; Ptr<GlobalPage> m_page_ptr; diff --git a/storage/ndb/src/kernel/blocks/restore.cpp b/storage/ndb/src/kernel/blocks/restore.cpp index 51644ef0712..2c204b912b1 100644 --- a/storage/ndb/src/kernel/blocks/restore.cpp +++ b/storage/ndb/src/kernel/blocks/restore.cpp @@ -559,6 +559,9 @@ Restore::restore_next(Signal* signal, FilePtr file_ptr) case BackupFormat::GCP_ENTRY: parse_gcp_entry(signal, file_ptr, data, len); break; + case BackupFormat::EMPTY_ENTRY: + // skip + break; case 0x4e444242: // 'NDBB' if (check_file_version(signal, ntohl(* (data+2))) == 0) { diff --git a/storage/ndb/src/kernel/vm/Configuration.cpp b/storage/ndb/src/kernel/vm/Configuration.cpp index e0b485eda59..fbda9873fd8 100644 --- a/storage/ndb/src/kernel/vm/Configuration.cpp +++ b/storage/ndb/src/kernel/vm/Configuration.cpp @@ -443,6 +443,11 @@ Configuration::setupConfiguration(){ "TimeBetweenWatchDogCheck missing"); } + if(iter.get(CFG_DB_WATCHDOG_INTERVAL_INITIAL, &_timeBetweenWatchDogCheckInitial)){ + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", + "TimeBetweenWatchDogCheckInitial missing"); + } + /** * Get paths */ @@ -462,9 +467,12 @@ Configuration::setupConfiguration(){ * Create the watch dog thread */ { - Uint32 t = _timeBetweenWatchDogCheck; + if (_timeBetweenWatchDogCheckInitial < _timeBetweenWatchDogCheck) + _timeBetweenWatchDogCheckInitial = _timeBetweenWatchDogCheck; + + Uint32 t = _timeBetweenWatchDogCheckInitial; t = globalEmulatorData.theWatchDog ->setCheckInterval(t); - _timeBetweenWatchDogCheck = t; + _timeBetweenWatchDogCheckInitial = t; } ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); diff --git a/storage/ndb/src/kernel/vm/Configuration.hpp b/storage/ndb/src/kernel/vm/Configuration.hpp index 934261e40af..918a889a171 100644 --- a/storage/ndb/src/kernel/vm/Configuration.hpp +++ b/storage/ndb/src/kernel/vm/Configuration.hpp @@ -84,6 +84,7 @@ private: Uint32 _maxErrorLogs; Uint32 _lockPagesInMainMemory; Uint32 _timeBetweenWatchDogCheck; + Uint32 _timeBetweenWatchDogCheckInitial; ndb_mgm_configuration * m_ownConfig; ndb_mgm_configuration * m_clusterConfig; diff --git a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp index 3125fc33258..7ad1d486a02 100644 --- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp @@ -19,6 +19,7 @@ #include <NdbOut.hpp> #include <GlobalData.hpp> #include <Emulator.hpp> +#include <WatchDog.hpp> #include <ErrorHandlingMacros.hpp> #include <TimeQueue.hpp> #include <TransporterRegistry.hpp> @@ -38,6 +39,9 @@ #include <AttributeDescriptor.hpp> #include <NdbSqlUtil.hpp> +#include <EventLogger.hpp> +extern EventLogger g_eventLogger; + #define ljamEntry() jamEntryLine(30000 + __LINE__) #define ljam() jamLine(30000 + __LINE__) @@ -655,14 +659,20 @@ SimulatedBlock::getBatSize(Uint16 blockNo){ return sb->theBATSize; } +void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId) +{ + return allocRecordAligned(type, s, n, 0, 0, clear, paramId); +} + void* -SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId) +SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId) { void * p = NULL; - size_t size = n*s; - Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s); - refresh_watch_dog(); + Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0; + size_t size = n*s + over_alloc; + Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc; + refresh_watch_dog(9); if (real_size > 0){ #ifdef VM_TRACE_MEM ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes", @@ -696,14 +706,24 @@ SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, U char * ptr = (char*)p; const Uint32 chunk = 128 * 1024; while(size > chunk){ - refresh_watch_dog(); + refresh_watch_dog(9); memset(ptr, 0, chunk); ptr += chunk; size -= chunk; } - refresh_watch_dog(); + refresh_watch_dog(9); memset(ptr, 0, size); } + if (unaligned_buffer) + { + *unaligned_buffer = p; + p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc)); +#ifdef VM_TRACE + g_eventLogger.info("'%s' (%u) %llu %llu, alignment correction %u bytes", + type, align, (Uint64)p, (Uint64)p+n*s, + (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer)); +#endif + } } return p; } @@ -720,9 +740,16 @@ SimulatedBlock::deallocRecord(void ** ptr, } void -SimulatedBlock::refresh_watch_dog() +SimulatedBlock::refresh_watch_dog(Uint32 place) +{ + globalData.incrementWatchDogCounter(place); +} + +void +SimulatedBlock::update_watch_dog_timer(Uint32 interval) { - globalData.incrementWatchDogCounter(1); + extern EmulatorData globalEmulatorData; + globalEmulatorData.theWatchDog->setCheckInterval(interval); } void diff --git a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp index 37a8dde5956..86e26986f93 100644 --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp @@ -334,7 +334,8 @@ protected: * Refresh Watch Dog in initialising code * */ - void refresh_watch_dog(); + void refresh_watch_dog(Uint32 place = 1); + void update_watch_dog_timer(Uint32 interval); /** * Prog error @@ -377,6 +378,7 @@ protected: * */ void* allocRecord(const char * type, size_t s, size_t n, bool clear = true, Uint32 paramId = 0); + void* allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align = NDB_O_DIRECT_WRITE_ALIGNMENT, bool clear = true, Uint32 paramId = 0); /** * Deallocate record diff --git a/storage/ndb/src/kernel/vm/WatchDog.cpp b/storage/ndb/src/kernel/vm/WatchDog.cpp index 2e24a5eaa6c..a7f5e8f5c2b 100644 --- a/storage/ndb/src/kernel/vm/WatchDog.cpp +++ b/storage/ndb/src/kernel/vm/WatchDog.cpp @@ -25,6 +25,8 @@ #include <ErrorHandlingMacros.hpp> #include <EventLogger.hpp> +#include <NdbTick.h> + extern EventLogger g_eventLogger; extern "C" @@ -72,73 +74,115 @@ WatchDog::doStop(){ } } +const char *get_action(Uint32 IPValue) +{ + const char *action; + switch (IPValue) { + case 1: + action = "Job Handling"; + break; + case 2: + action = "Scanning Timers"; + break; + case 3: + action = "External I/O"; + break; + case 4: + action = "Print Job Buffers at crash"; + break; + case 5: + action = "Checking connections"; + break; + case 6: + action = "Performing Send"; + break; + case 7: + action = "Polling for Receive"; + break; + case 8: + action = "Performing Receive"; + break; + case 9: + action = "Allocating memory"; + break; + default: + action = "Unknown place"; + break; + }//switch + return action; +} + void -WatchDog::run(){ - unsigned int anIPValue; - unsigned int alerts = 0; +WatchDog::run() +{ + unsigned int anIPValue, sleep_time; unsigned int oldIPValue = 0; - + unsigned int theIntervalCheck = theInterval; + struct MicroSecondTimer start_time, last_time, now; + NdbTick_getMicroTimer(&start_time); + last_time = start_time; + // WatchDog for the single threaded NDB - while(!theStop){ - Uint32 tmp = theInterval / 500; - tmp= (tmp ? tmp : 1); - - while(!theStop && tmp > 0){ - NdbSleep_MilliSleep(500); - tmp--; - } - + while (!theStop) + { + sleep_time= 100; + + NdbSleep_MilliSleep(sleep_time); if(theStop) break; + NdbTick_getMicroTimer(&now); + if (NdbTick_getMicrosPassed(last_time, now)/1000 > sleep_time*2) + { + struct tms my_tms; + times(&my_tms); + g_eventLogger.info("Watchdog: User time: %llu System time: %llu", + (Uint64)my_tms.tms_utime, + (Uint64)my_tms.tms_stime); + g_eventLogger.warning("Watchdog: Warning overslept %u ms, expected %u ms.", + NdbTick_getMicrosPassed(last_time, now)/1000, + sleep_time); + } + last_time = now; + // Verify that the IP thread is not stuck in a loop anIPValue = *theIPValue; - if(anIPValue != 0) { + if (anIPValue != 0) + { oldIPValue = anIPValue; globalData.incrementWatchDogCounter(0); - alerts = 0; - } else { - const char *last_stuck_action; - alerts++; - switch (oldIPValue) { - case 1: - last_stuck_action = "Job Handling"; - break; - case 2: - last_stuck_action = "Scanning Timers"; - break; - case 3: - last_stuck_action = "External I/O"; - break; - case 4: - last_stuck_action = "Print Job Buffers at crash"; - break; - case 5: - last_stuck_action = "Checking connections"; - break; - case 6: - last_stuck_action = "Performing Send"; - break; - case 7: - last_stuck_action = "Polling for Receive"; - break; - case 8: - last_stuck_action = "Performing Receive"; - break; - default: - last_stuck_action = "Unknown place"; - break; - }//switch - g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action); + NdbTick_getMicroTimer(&start_time); + theIntervalCheck = theInterval; + } + else + { + int warn = 1; + Uint32 elapsed = NdbTick_getMicrosPassed(start_time, now)/1000; + /* + oldIPValue == 9 indicates malloc going on, this can take some time + so only warn if we pass the watchdog interval + */ + if (oldIPValue == 9) + if (elapsed < theIntervalCheck) + warn = 0; + else + theIntervalCheck += theInterval; + + if (warn) { - struct tms my_tms; - times(&my_tms); - g_eventLogger.info("User time: %llu System time: %llu", - (Uint64)my_tms.tms_utime, - (Uint64)my_tms.tms_stime); - } - if(alerts == 3){ - shutdownSystem(last_stuck_action); + const char *last_stuck_action = get_action(oldIPValue); + g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action); + { + struct tms my_tms; + times(&my_tms); + g_eventLogger.info("Watchdog: User time: %llu System time: %llu", + (Uint64)my_tms.tms_utime, + (Uint64)my_tms.tms_stime); + } + if (elapsed > 3 * theInterval) + { + shutdownSystem(last_stuck_action); + } } } } diff --git a/storage/ndb/src/mgmsrv/ConfigInfo.cpp b/storage/ndb/src/mgmsrv/ConfigInfo.cpp index 92ca24e8e0a..56aacda214d 100644 --- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp @@ -572,6 +572,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { STR_VALUE(MAX_INT_RNIL) }, { + CFG_DB_WATCHDOG_INTERVAL_INITIAL, + "TimeBetweenWatchDogCheckInitial", + DB_TOKEN, + "Time between execution checks inside a database node in the early start phases when memory is allocated", + ConfigInfo::CI_USED, + true, + ConfigInfo::CI_INT, + "6000", + "70", + STR_VALUE(MAX_INT_RNIL) }, + + { CFG_DB_STOP_ON_ERROR, "StopOnError", DB_TOKEN, @@ -1313,6 +1325,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { "0", STR_VALUE(MAX_INT_RNIL) }, + { + CFG_DB_O_DIRECT, + "ODirect", + DB_TOKEN, + "Use O_DIRECT file write/read when possible", + ConfigInfo::CI_USED, + true, + ConfigInfo::CI_BOOL, + "false", + "false", + "true"}, + /*************************************************************************** * API ***************************************************************************/ diff --git a/storage/ndb/tools/restore/Restore.cpp b/storage/ndb/tools/restore/Restore.cpp index 3d466384782..15e442a4f35 100644 --- a/storage/ndb/tools/restore/Restore.cpp +++ b/storage/ndb/tools/restore/Restore.cpp @@ -867,13 +867,32 @@ bool RestoreDataIterator::readFragmentHeader(int & ret, Uint32 *fragmentId) debug << "RestoreDataIterator::getNextFragment" << endl; - if (buffer_read(&Header, sizeof(Header), 1) != 1){ + while (1) + { + /* read first part of header */ + if (buffer_read(&Header, 8, 1) != 1) + { + ret = 0; + return false; + } // if + + /* skip if EMPTY_ENTRY */ + Header.SectionType = ntohl(Header.SectionType); + Header.SectionLength = ntohl(Header.SectionLength); + if (Header.SectionType == BackupFormat::EMPTY_ENTRY) + { + void *tmp; + buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1); + continue; + } + break; + } + /* read rest of header */ + if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1) + { ret = 0; return false; - } // if - - Header.SectionType = ntohl(Header.SectionType); - Header.SectionLength = ntohl(Header.SectionLength); + } Header.TableId = ntohl(Header.TableId); Header.FragmentNo = ntohl(Header.FragmentNo); Header.ChecksumType = ntohl(Header.ChecksumType); |