diff options
Diffstat (limited to 'ndb/src/kernel/vm')
-rw-r--r-- | ndb/src/kernel/vm/Configuration.cpp | 439 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Configuration.hpp | 28 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Emulator.cpp | 10 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Emulator.hpp | 19 | ||||
-rw-r--r-- | ndb/src/kernel/vm/FastScheduler.cpp | 14 | ||||
-rw-r--r-- | ndb/src/kernel/vm/FastScheduler.hpp | 14 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Makefile.am | 29 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Makefile_old (renamed from ndb/src/kernel/vm/Makefile) | 5 | ||||
-rw-r--r-- | ndb/src/kernel/vm/SignalCounter.hpp | 2 | ||||
-rw-r--r-- | ndb/src/kernel/vm/SimulatedBlock.cpp | 59 | ||||
-rw-r--r-- | ndb/src/kernel/vm/SimulatedBlock.hpp | 15 | ||||
-rw-r--r-- | ndb/src/kernel/vm/TransporterCallback.cpp | 7 | ||||
-rw-r--r-- | ndb/src/kernel/vm/VMSignal.hpp | 9 | ||||
-rw-r--r-- | ndb/src/kernel/vm/pc.hpp | 21 | ||||
-rw-r--r-- | ndb/src/kernel/vm/testLongSig/testLongSig.cpp | 40 |
15 files changed, 589 insertions, 122 deletions
diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index 706d75509f2..c97ad951cf3 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -27,6 +27,15 @@ #include <getarg.h> +#include <mgmapi_configuration.hpp> +#include <mgmapi_config_parameters_debug.h> +#include <kernel_config_parameters.h> + +#include <kernel_types.h> +#include <ndb_limits.h> +#include "pc.hpp" +#include <LogLevel.hpp> + extern "C" { void ndbSetOwnVersion(); } @@ -122,10 +131,8 @@ Configuration::init(int argc, const char** argv){ return true; } -Configuration::Configuration(): - the_clusterConfigurationData() +Configuration::Configuration() { - m_ownProperties = 0; _programName = 0; _connectString = 0; _fsPath = 0; @@ -134,8 +141,6 @@ Configuration::Configuration(): } Configuration::~Configuration(){ - delete m_ownProperties; - if(_programName != NULL) free(_programName); @@ -143,12 +148,6 @@ Configuration::~Configuration(){ free(_fsPath); } -const -ClusterConfiguration& -Configuration::clusterConfiguration() const { - return the_clusterConfigurationData; -} - void Configuration::setupConfiguration(){ /** @@ -157,7 +156,7 @@ Configuration::setupConfiguration(){ ConfigRetriever cr; cr.setConnectString(_connectString); stopOnError(true); - Properties * p = cr.getConfig("DB", NDB_VERSION); + ndb_mgm_configuration * p = cr.getConfig(NDB_VERSION, NODE_TYPE_DB); if(p == 0){ const char * s = cr.getErrorString(); if(s == 0) @@ -171,56 +170,46 @@ Configuration::setupConfiguration(){ "/invalid configuration", s); } + Uint32 nodeId = globalData.ownId = cr.getOwnNodeId(); + /** * Configure transporters */ { - IPCConfig * theIPC = new IPCConfig(p); - - if(theIPC->init() != 0){ - ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", ""); - } - - if(theIPC->configureTransporters(&globalTransporterRegistry) <= 0){ + int res = IPCConfig::configureTransporters(nodeId, + * p, + globalTransporterRegistry); + if(res <= 0){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "No transporters configured"); } - - globalData.ownId = theIPC->ownId(); - delete theIPC; } /** * Setup cluster configuration data */ - const Properties * db = 0; - if (!p->get("Node", globalData.ownId, &db)) { + ndb_mgm_configuration_iterator iter(* p, CFG_SECTION_NODE); + if (iter.find(CFG_NODE_ID, globalData.ownId)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "DB missing"); } - const char * type; - if(!(db->get("Type", &type) && strcmp(type, "DB") == 0)){ + + unsigned type; + if(!(iter.get(CFG_TYPE_OF_SECTION, &type) == 0 && type == NODE_TYPE_DB)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "I'm wrong type of node"); } - /** - * Save properties object to use in getOwnProperties() - */ - m_ownProperties = new Properties(* db); - - the_clusterConfigurationData.init(* p, * db); - - if(!db->get("MaxNoOfSavedMessages", &_maxErrorLogs)){ + if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "MaxNoOfSavedMessages missing"); } - if(!db->get("LockPagesInMainMemory", &_lockPagesInMainMemory)){ + if(iter.get(CFG_DB_MEMLOCK, &_lockPagesInMainMemory)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "LockPagesInMainMemory missing"); } - if(!db->get("TimeBetweenWatchDogCheck", &_timeBetweenWatchDogCheck)){ + if(iter.get(CFG_DB_WATCHDOG_INTERVAL, &_timeBetweenWatchDogCheck)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "TimeBetweenWatchDogCheck missing"); } @@ -230,16 +219,16 @@ Configuration::setupConfiguration(){ */ { const char* pFileSystemPath = NULL; - if(!db->get("FileSystemPath", &pFileSystemPath)){ + if(iter.get(CFG_DB_FILESYSTEM_PATH, &pFileSystemPath)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "FileSystemPath missing"); } - + if(pFileSystemPath == 0 || strlen(pFileSystemPath) == 0){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "Configuration does not contain valid filesystem path"); } - + if(pFileSystemPath[strlen(pFileSystemPath) - 1] == '/') _fsPath = strdup(pFileSystemPath); else { @@ -248,19 +237,17 @@ Configuration::setupConfiguration(){ strcat(_fsPath, "/"); } } - - if(!db->get("StopOnError", &_stopOnError)){ + + if(iter.get(CFG_DB_STOP_ON_ERROR, &_stopOnError)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "StopOnError missing"); } - - if(!db->get("RestartOnErrorInsert", &m_restartOnErrorInsert)){ + + if(iter.get(CFG_DB_STOP_ON_ERROR_INSERT, &m_restartOnErrorInsert)){ ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", "RestartOnErrorInsert missing"); } - delete p; - /** * Create the watch dog thread */ @@ -269,7 +256,14 @@ Configuration::setupConfiguration(){ t = globalEmulatorData.theWatchDog ->setCheckInterval(t); _timeBetweenWatchDogCheck = t; } + + ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); + + m_clusterConfig = p; + m_clusterConfigIter = ndb_mgm_create_configuration_iterator + (p, CFG_SECTION_NODE); + calcSizeAlt(cf); } bool @@ -282,12 +276,6 @@ Configuration::timeBetweenWatchDogCheck() const { return _timeBetweenWatchDogCheck; } -const -ClusterConfiguration::ClusterData& -Configuration::clusterConfigurationData() const { - return the_clusterConfigurationData.clusterData(); -} - void Configuration::timeBetweenWatchDogCheck(int value) { _timeBetweenWatchDogCheck = value; @@ -313,11 +301,6 @@ Configuration::stopOnError(bool val){ _stopOnError = val; } -const Properties * -Configuration::getOwnProperties() const { - return m_ownProperties; -} - int Configuration::getRestartOnErrorInsert() const { return m_restartOnErrorInsert; @@ -335,6 +318,350 @@ Configuration::getConnectStringCopy() const { return 0; } +const ndb_mgm_configuration_iterator * +Configuration::getOwnConfigIterator() const { + return m_ownConfigIterator; +} + +ndb_mgm_configuration_iterator * +Configuration::getClusterConfigIterator() const { + return m_clusterConfigIter; +} + +void +Configuration::calcSizeAlt(ConfigValues * ownConfig){ + const char * msg = "Invalid configuration fetched"; + char buf[255]; + + unsigned int noOfTables = 0; + unsigned int noOfIndexes = 0; + unsigned int noOfReplicas = 0; + unsigned int noOfDBNodes = 0; + unsigned int noOfAPINodes = 0; + unsigned int noOfMGMNodes = 0; + unsigned int noOfNodes = 0; + unsigned int noOfAttributes = 0; + unsigned int noOfOperations = 0; + unsigned int noOfTransactions = 0; + unsigned int noOfIndexPages = 0; + unsigned int noOfDataPages = 0; + unsigned int noOfScanRecords = 0; + m_logLevel = new LogLevel(); + + /** + * {"NoOfConcurrentCheckpointsDuringRestart", &cd.ispValues[1][5] }, + * {"NoOfConcurrentCheckpointsAfterRestart", &cd.ispValues[2][4] }, + * {"NoOfConcurrentProcessesHandleTakeover", &cd.ispValues[1][7] }, + * {"TimeToWaitAlive", &cd.ispValues[0][0] }, + */ + struct AttribStorage { int paramId; Uint32 * storage; }; + AttribStorage tmp[] = { + { CFG_DB_NO_SCANS, &noOfScanRecords }, + { CFG_DB_NO_TABLES, &noOfTables }, + { CFG_DB_NO_INDEXES, &noOfIndexes }, + { CFG_DB_NO_REPLICAS, &noOfReplicas }, + { CFG_DB_NO_ATTRIBUTES, &noOfAttributes }, + { CFG_DB_NO_OPS, &noOfOperations }, + { CFG_DB_NO_TRANSACTIONS, &noOfTransactions } +#if 0 + { "NoOfDiskPagesToDiskDuringRestartTUP", &cd.ispValues[3][8] }, + { "NoOfDiskPagesToDiskAfterRestartTUP", &cd.ispValues[3][9] }, + { "NoOfDiskPagesToDiskDuringRestartACC", &cd.ispValues[3][10] }, + { "NoOfDiskPagesToDiskAfterRestartACC", &cd.ispValues[3][11] }, +#endif + }; + + ndb_mgm_configuration_iterator db(*(ndb_mgm_configuration*)ownConfig, 0); + + const int sz = sizeof(tmp)/sizeof(AttribStorage); + for(int i = 0; i<sz; i++){ + if(ndb_mgm_get_int_parameter(&db, tmp[i].paramId, tmp[i].storage)){ + snprintf(buf, sizeof(buf), "ConfigParam: %d not found", tmp[i].paramId); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + } + + Uint64 indexMem = 0, dataMem = 0; + ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem); + ndb_mgm_get_int64_parameter(&db, CFG_DB_INDEX_MEM, &indexMem); + if(dataMem == 0){ + snprintf(buf, sizeof(buf), "ConfigParam: %d not found", CFG_DB_DATA_MEM); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + + if(indexMem == 0){ + snprintf(buf, sizeof(buf), "ConfigParam: %d not found", CFG_DB_INDEX_MEM); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + + noOfDataPages = (dataMem / 8192); + noOfIndexPages = (indexMem / 8192); + + for(unsigned j = 0; j<LogLevel::LOGLEVEL_CATEGORIES; j++){ + Uint32 tmp; + if(!ndb_mgm_get_int_parameter(&db, LogLevel::MIN_LOGLEVEL_ID+j, &tmp)){ + m_logLevel->setLogLevel((LogLevel::EventCategory)j, tmp); + } + } + + // tmp + ndb_mgm_configuration_iterator * p = m_clusterConfigIter; + + Uint32 nodeNo = noOfNodes = 0; + NodeBitmask nodes; + for(ndb_mgm_first(p); ndb_mgm_valid(p); ndb_mgm_next(p), nodeNo++){ + + Uint32 nodeId; + Uint32 nodeType; + + if(ndb_mgm_get_int_parameter(p, CFG_NODE_ID, &nodeId)){ + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, "Node data (Id) missing"); + } + + if(ndb_mgm_get_int_parameter(p, CFG_TYPE_OF_SECTION, &nodeType)){ + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, "Node data (Type) missing"); + } + + if(nodeId > MAX_NODES || nodeId == 0){ + snprintf(buf, sizeof(buf), + "Invalid node id: %d", nodeId); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + + if(nodes.get(nodeId)){ + snprintf(buf, sizeof(buf), "Two node can not have the same node id: %d", + nodeId); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + nodes.set(nodeId); + + switch(nodeType){ + case NODE_TYPE_DB: + noOfDBNodes++; // No of NDB processes + + if(nodeId > MAX_NDB_NODES){ + snprintf(buf, sizeof(buf), "Maximum node id for a ndb node is: %d", + MAX_NDB_NODES); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + break; + case NODE_TYPE_API: + noOfAPINodes++; // No of API processes + break; + case NODE_TYPE_REP: + break; + case NODE_TYPE_MGM: + noOfMGMNodes++; // No of MGM processes + break; + case NODE_TYPE_EXT_REP: + break; + default: + snprintf(buf, sizeof(buf), "Unknown node type: %d", nodeType); + ERROR_SET(fatal, ERR_INVALID_CONFIG, msg, buf); + } + } + noOfNodes = nodeNo; + + /** + * Do size calculations + */ + ConfigValuesFactory cfg(ownConfig); + + noOfTables++; // Remove impact of system table + noOfTables += noOfIndexes; // Indexes are tables too + noOfAttributes += 2; // ---"---- + noOfTables *= 2; // Remove impact of Dict need 2 ids for each table + + if (noOfDBNodes > 15) { + noOfDBNodes = 15; + }//if + Uint32 noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) + 1; + Uint32 noOfTCScanRecords = noOfScanRecords; + + { + /** + * Acc Size Alt values + */ + // Can keep 65536 pages (= 0.5 GByte) + cfg.put(CFG_ACC_DIR_RANGE, + 4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_ACC_DIR_ARRAY, + (noOfIndexPages >> 8) + + 4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_ACC_FRAGMENT, + 2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + /*-----------------------------------------------------------------------*/ + // The extra operation records added are used by the scan and node + // recovery process. + // Node recovery process will have its operations dedicated to ensure + // that they never have a problem with allocation of the operation record. + // The remainder are allowed for use by the scan processes. + /*-----------------------------------------------------------------------*/ + cfg.put(CFG_ACC_OP_RECS, + noOfReplicas*((16 * noOfOperations) / 10 + 50) + + (noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) + + NODE_RECOVERY_SCAN_OP_RECORDS); + + cfg.put(CFG_ACC_OVERFLOW_RECS, + noOfIndexPages + + 2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_ACC_PAGE8, + noOfIndexPages + 32); + + cfg.put(CFG_ACC_ROOT_FRAG, + NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_ACC_TABLE, noOfTables); + + cfg.put(CFG_ACC_SCAN, noOfLocalScanRecords); + } + + { + /** + * Dict Size Alt values + */ + cfg.put(CFG_DICT_ATTRIBUTE, + noOfAttributes); + + cfg.put(CFG_DICT_CONNECT, + noOfOperations + 32); + + cfg.put(CFG_DICT_FRAG_CONNECT, + NO_OF_FRAG_PER_NODE * noOfDBNodes * noOfReplicas); + + cfg.put(CFG_DICT_TABLE, + noOfTables); + + cfg.put(CFG_DICT_TC_CONNECT, + 2* noOfOperations); + } + + { + /** + * Dih Size Alt values + */ + cfg.put(CFG_DIH_API_CONNECT, + 2 * noOfTransactions); + + cfg.put(CFG_DIH_CONNECT, + noOfOperations + 46); + + cfg.put(CFG_DIH_FRAG_CONNECT, + NO_OF_FRAG_PER_NODE * noOfTables * noOfDBNodes); + + int temp; + temp = noOfReplicas - 2; + if (temp < 0) + temp = 1; + else + temp++; + cfg.put(CFG_DIH_MORE_NODES, + temp * NO_OF_FRAG_PER_NODE * + noOfTables * noOfDBNodes); + + cfg.put(CFG_DIH_REPLICAS, + NO_OF_FRAG_PER_NODE * noOfTables * + noOfDBNodes * noOfReplicas); + + cfg.put(CFG_DIH_TABLE, + noOfTables); + } + + { + /** + * Lqh Size Alt values + */ + cfg.put(CFG_LQH_FRAG, + NO_OF_FRAG_PER_NODE * noOfTables * noOfReplicas); + + cfg.put(CFG_LQH_CONNECT, + noOfReplicas*((11 * noOfOperations) / 10 + 50)); + + cfg.put(CFG_LQH_TABLE, + noOfTables); + + cfg.put(CFG_LQH_TC_CONNECT, + noOfReplicas*((16 * noOfOperations) / 10 + 50)); + + cfg.put(CFG_LQH_REPLICAS, + noOfReplicas); + + cfg.put(CFG_LQH_SCAN, + noOfLocalScanRecords); + } + + { + /** + * Tc Size Alt values + */ + cfg.put(CFG_TC_API_CONNECT, + 3 * noOfTransactions); + + cfg.put(CFG_TC_TC_CONNECT, + noOfOperations + 16 + noOfTransactions); + + cfg.put(CFG_TC_TABLE, + noOfTables); + + cfg.put(CFG_TC_LOCAL_SCAN, + noOfLocalScanRecords); + + cfg.put(CFG_TC_SCAN, + noOfTCScanRecords); + } + + { + /** + * Tup Size Alt values + */ + cfg.put(CFG_TUP_FRAG, + 2 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_TUP_OP_RECS, + noOfReplicas*((16 * noOfOperations) / 10 + 50)); + + cfg.put(CFG_TUP_PAGE, + noOfDataPages); + + cfg.put(CFG_TUP_PAGE_RANGE, + 4 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas); + + cfg.put(CFG_TUP_TABLE, + noOfTables); + + cfg.put(CFG_TUP_TABLE_DESC, + 4 * NO_OF_FRAG_PER_NODE * noOfAttributes* noOfReplicas + + 12 * NO_OF_FRAG_PER_NODE * noOfTables* noOfReplicas ); + + cfg.put(CFG_TUP_STORED_PROC, + noOfLocalScanRecords); + } + + { + /** + * Tux Size Alt values + */ + cfg.put(CFG_TUX_INDEX, + noOfTables); + + cfg.put(CFG_TUX_FRAGMENT, + 2 * NO_OF_FRAG_PER_NODE * noOfTables * noOfReplicas); + + cfg.put(CFG_TUX_ATTRIBUTE, + noOfIndexes * 4); + + cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords); + } + + m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues(); + m_ownConfigIterator = ndb_mgm_create_configuration_iterator + (m_ownConfig, 0); +} + void Configuration::setInitialStart(bool val){ _initialStart = val; diff --git a/ndb/src/kernel/vm/Configuration.hpp b/ndb/src/kernel/vm/Configuration.hpp index 3f96bb454c5..1706ad05867 100644 --- a/ndb/src/kernel/vm/Configuration.hpp +++ b/ndb/src/kernel/vm/Configuration.hpp @@ -17,7 +17,8 @@ #ifndef Configuration_H #define Configuration_H -#include "ClusterConfiguration.hpp" +#include <mgmapi.h> +#include <ndb_types.h> class Configuration { public: @@ -46,35 +47,36 @@ public: void setRestartOnErrorInsert(int); // Cluster configuration - const ClusterConfiguration::ClusterData& clusterConfigurationData() const; - const ClusterConfiguration& clusterConfiguration() const; - const char * programName() const; const char * fileSystemPath() const; char * getConnectStringCopy() const; /** - * Return Properties for own node - */ - const Properties * getOwnProperties() const; - - /** * */ bool getInitialStart() const; void setInitialStart(bool val); bool getDaemonMode() const; - + + const ndb_mgm_configuration_iterator * getOwnConfigIterator() const; + + class LogLevel * m_logLevel; private: + friend class Cmvmi; + friend class Qmgr; + ndb_mgm_configuration_iterator * getClusterConfigIterator() const; + Uint32 _stopOnError; Uint32 m_restartOnErrorInsert; Uint32 _maxErrorLogs; Uint32 _lockPagesInMainMemory; Uint32 _timeBetweenWatchDogCheck; + ndb_mgm_configuration * m_ownConfig; + ndb_mgm_configuration * m_clusterConfig; - ClusterConfiguration the_clusterConfigurationData; - const Properties * m_ownProperties; + ndb_mgm_configuration_iterator * m_clusterConfigIter; + ndb_mgm_configuration_iterator * m_ownConfigIterator; /** * arguments to NDB process @@ -84,6 +86,8 @@ private: bool _initialStart; char * _connectString; bool _daemonMode; + + void calcSizeAlt(class ConfigValues * ); }; inline diff --git a/ndb/src/kernel/vm/Emulator.cpp b/ndb/src/kernel/vm/Emulator.cpp index 0d6d3f55acb..b615e41eb65 100644 --- a/ndb/src/kernel/vm/Emulator.cpp +++ b/ndb/src/kernel/vm/Emulator.cpp @@ -43,11 +43,11 @@ extern "C" { * Declare the global variables */ -#ifdef USE_EMULATED_JAM - Uint8 theEmulatedJam[EMULATED_JAM_SIZE * 4]; - Uint32 theEmulatedJamIndex = 0; - Uint32 theEmulatedJamBlockNumber = 0; -#endif // USE_EMULATED_JAM +#ifndef NO_EMULATED_JAM +Uint8 theEmulatedJam[EMULATED_JAM_SIZE * 4]; +Uint32 theEmulatedJamIndex = 0; +Uint32 theEmulatedJamBlockNumber = 0; +#endif GlobalData globalData; diff --git a/ndb/src/kernel/vm/Emulator.hpp b/ndb/src/kernel/vm/Emulator.hpp index ba533eb873d..8c4504b9ba7 100644 --- a/ndb/src/kernel/vm/Emulator.hpp +++ b/ndb/src/kernel/vm/Emulator.hpp @@ -36,15 +36,18 @@ extern struct GlobalData globalData; extern class SignalLoggerManager globalSignalLoggers; #endif -#ifdef USE_EMULATED_JAM -#define EMULATED_JAM_SIZE 1024 -#define JAM_MASK ((EMULATED_JAM_SIZE * 4) - 1) +#ifndef NO_EMULATED_JAM + #define EMULATED_JAM_SIZE 1024 + #define JAM_MASK ((EMULATED_JAM_SIZE * 4) - 1) -extern Uint8 theEmulatedJam[]; -extern Uint32 theEmulatedJamIndex; -// last block entry, used in dumpJam() if jam contains no block entries -extern Uint32 theEmulatedJamBlockNumber; -#endif // USE_EMULATED_JAM + extern Uint8 theEmulatedJam[]; + extern Uint32 theEmulatedJamIndex; + // last block entry, used in dumpJam() if jam contains no block entries + extern Uint32 theEmulatedJamBlockNumber; +#else + const Uint8 theEmulatedJam[]=0; + const Uint32 theEmulatedJamIndex=0; +#endif struct EmulatorData { class Configuration * theConfiguration; diff --git a/ndb/src/kernel/vm/FastScheduler.cpp b/ndb/src/kernel/vm/FastScheduler.cpp index e9ca4834562..eca456d26dd 100644 --- a/ndb/src/kernel/vm/FastScheduler.cpp +++ b/ndb/src/kernel/vm/FastScheduler.cpp @@ -316,14 +316,14 @@ APZJobBuffer::signal2buffer(Signal* signal, void APZJobBuffer::insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]){ - Uint32 tOccupancy = theOccupancy; + Uint32 tOccupancy = theOccupancy + 1; Uint32 myWPtr = wPtr; register BufferEntry& buf = buffer[myWPtr]; if (tOccupancy < bufSize) { Uint32 cond = (++myWPtr == bufSize) - 1; wPtr = myWPtr & cond; - theOccupancy = tOccupancy + 1; + theOccupancy = tOccupancy; buf.header = * sh; const Uint32 len = buf.header.theLength; @@ -342,8 +342,9 @@ APZJobBuffer::insert(const SignalHeader * const sh, }//if } APZJobBuffer::APZJobBuffer() - : rPtr(0), wPtr(0), theOccupancy(0), bufSize(0), buffer(NULL), memRef(NULL) + : bufSize(0), buffer(NULL), memRef(NULL) { + clear(); } APZJobBuffer::~APZJobBuffer() @@ -354,9 +355,11 @@ APZJobBuffer::~APZJobBuffer() void APZJobBuffer::newBuffer(int size) { - buffer = new BufferEntry[size]; + buffer = new BufferEntry[size + 1]; // +1 to support "overrrun" if(buffer){ +#ifndef NDB_PURIFY ::memset(buffer, 0, (size * sizeof(BufferEntry))); +#endif bufSize = size; } else bufSize = 0; @@ -474,10 +477,11 @@ FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) { signal.theData[0] = EventReport::JobStatistic; signal.theData[1] = tMeanLoopCount; + memset(&signal.header, 0, sizeof(SignalHeader)); signal.header.theLength = 2; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, 0); - + execute(&signal, JBA, CMVMI, GSN_EVENT_REP); } diff --git a/ndb/src/kernel/vm/FastScheduler.hpp b/ndb/src/kernel/vm/FastScheduler.hpp index 586a7ea27ad..9749dab5d85 100644 --- a/ndb/src/kernel/vm/FastScheduler.hpp +++ b/ndb/src/kernel/vm/FastScheduler.hpp @@ -43,7 +43,7 @@ class BufferEntry { public: SignalHeader header; - Uint32 theDataRegister[28]; + Uint32 theDataRegister[25]; }; class APZJobBuffer @@ -68,7 +68,6 @@ public: void retrieveDump(Signal *signal, Uint32 myRptr); void clear(); - bool isEmpty() const; Uint32 getOccupancy() const; Uint32 getReadPtr() const; @@ -313,13 +312,13 @@ void APZJobBuffer::insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn) { - Uint32 tOccupancy = theOccupancy; + Uint32 tOccupancy = theOccupancy + 1; Uint32 myWPtr = wPtr; if (tOccupancy < bufSize) { register BufferEntry& buf = buffer[myWPtr]; Uint32 cond = (++myWPtr == bufSize) - 1; wPtr = myWPtr & cond; - theOccupancy = tOccupancy + 1; + theOccupancy = tOccupancy; signal2buffer(signal, bnr, gsn, buf); //--------------------------------------------------------- // Prefetch of buffer[wPtr] is done here. We prefetch for @@ -343,11 +342,4 @@ APZJobBuffer::insert(Signal* signal, BlockNumber bnr, signal2buffer(signal, bnr, gsn, buf); } -inline -bool -APZJobBuffer::isEmpty() const -{ - return (theOccupancy == 0); -} - #endif diff --git a/ndb/src/kernel/vm/Makefile.am b/ndb/src/kernel/vm/Makefile.am new file mode 100644 index 00000000000..4e9dbe36c78 --- /dev/null +++ b/ndb/src/kernel/vm/Makefile.am @@ -0,0 +1,29 @@ +#SUBDIRS = testCopy testDataBuffer testSimplePropertiesSection +#ifneq ($(USE_EDITLINE), N) +#DIRS += testLongSig +#endif + +noinst_LIBRARIES = libkernel.a + +libkernel_a_SOURCES = \ + SimulatedBlock.cpp \ + FastScheduler.cpp \ + TimeQueue.cpp \ + VMSignal.cpp \ + ThreadConfig.cpp \ + TransporterCallback.cpp \ + Emulator.cpp \ + Configuration.cpp \ + WatchDog.cpp \ + SimplePropertiesSection.cpp \ + SectionReader.cpp \ + MetaData.cpp \ + Mutex.cpp SafeCounter.cpp + +INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi + +include $(top_srcdir)/ndb/config/common.mk.am +include $(top_srcdir)/ndb/config/type_kernel.mk.am + +# Don't update the files from bitkeeper +%::SCCS/s.% diff --git a/ndb/src/kernel/vm/Makefile b/ndb/src/kernel/vm/Makefile_old index 3f448b77b17..a162f3672ce 100644 --- a/ndb/src/kernel/vm/Makefile +++ b/ndb/src/kernel/vm/Makefile_old @@ -13,17 +13,18 @@ SOURCES = \ TransporterCallback.cpp \ Emulator.cpp \ Configuration.cpp \ - ClusterConfiguration.cpp \ WatchDog.cpp \ SimplePropertiesSection.cpp \ SectionReader.cpp \ MetaData.cpp \ Mutex.cpp SafeCounter.cpp +CFLAGS_Configuration.cpp := -I$(call fixpath,$(NDB_TOP)/src/mgmapi) + DIRS := testCopy testDataBuffer testSimplePropertiesSection + ifneq ($(USE_EDITLINE), N) DIRS += testLongSig endif - include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/kernel/vm/SignalCounter.hpp b/ndb/src/kernel/vm/SignalCounter.hpp index d572551ea92..ea770324aa6 100644 --- a/ndb/src/kernel/vm/SignalCounter.hpp +++ b/ndb/src/kernel/vm/SignalCounter.hpp @@ -21,6 +21,8 @@ #include <ErrorReporter.hpp> class SignalCounter { + friend struct NodeReceiverGroup; + private: Uint32 m_count; NdbNodeBitmask m_nodes; diff --git a/ndb/src/kernel/vm/SimulatedBlock.cpp b/ndb/src/kernel/vm/SimulatedBlock.cpp index e3f087d7d74..a6a8a6242cd 100644 --- a/ndb/src/kernel/vm/SimulatedBlock.cpp +++ b/ndb/src/kernel/vm/SimulatedBlock.cpp @@ -60,7 +60,8 @@ SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, c_fragmentIdCounter = 1; c_fragSenderRunning = false; - const Properties * p = conf.getOwnProperties(); + Properties tmp; + const Properties * p = &tmp; ndbrequire(p != 0); Uint32 count = 10; @@ -98,7 +99,9 @@ SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++) theExecArray[i] = 0; + installSimulatedBlockFunctions(); + UpgradeStartup::installEXEC(this); CLEAR_ERROR_INSERT_VALUE; } @@ -127,6 +130,7 @@ SimulatedBlock::installSimulatedBlockFunctions(){ a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF; a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF; a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF; + a[GSN_READ_CONFIG_REQ] = &SimulatedBlock::execREAD_CONFIG_REQ; } void @@ -182,7 +186,7 @@ SimulatedBlock::sendSignal(BlockReference ref, Uint32 tSignalId = signal->header.theSignalId; - if ((length == 0) || (length > 25) || (recBlock == 0)) { + if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if @@ -263,7 +267,7 @@ SimulatedBlock::sendSignal(NodeReceiverGroup rg, signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); - if ((length == 0) || (length > 25) || (recBlock == 0)) { + if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if @@ -371,7 +375,7 @@ SimulatedBlock::sendSignal(BlockReference ref, Uint32 tSignalId = signal->header.theSignalId; Uint32 tFragInfo = signal->header.m_fragmentInfo; - if ((length == 0) || (length > 25) || (recBlock == 0)) { + if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if @@ -464,7 +468,7 @@ SimulatedBlock::sendSignal(NodeReceiverGroup rg, signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = noOfSections; - if ((length == 0) || (length > 25) || (recBlock == 0)) { + if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if @@ -1338,7 +1342,7 @@ SimulatedBlock::sendFirstFragment(FragmentSendInfo & info, */ return true; } - + /** * Setup info object */ @@ -1724,9 +1728,52 @@ void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){ c_mutexMgr.execUTIL_UNLOCK_CONF(signal); } +void +SimulatedBlock::execREAD_CONFIG_REQ(Signal* signal){ + const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); + + Uint32 ref = req->senderRef; + Uint32 senderData = req->senderData; + + ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); + conf->senderRef = reference(); + conf->senderData = senderData; + sendSignal(ref, GSN_READ_CONFIG_CONF, signal, + ReadConfigConf::SignalLength, JBB); +} + void SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal, Uint32 ptrI, Uint32 retVal){ c_mutexMgr.release(ptrI); } +void +UpgradeStartup::installEXEC(SimulatedBlock* block){ + SimulatedBlock::ExecFunction * a = block->theExecArray; + switch(block->number()){ + case QMGR: + a[UpgradeStartup::GSN_CM_APPCHG] = &SimulatedBlock::execUPGRADE; + break; + case CNTR: + a[UpgradeStartup::GSN_CNTR_MASTERREF] = &SimulatedBlock::execUPGRADE; + a[UpgradeStartup::GSN_CNTR_MASTERCONF] = &SimulatedBlock::execUPGRADE; + break; + } +} + +void +SimulatedBlock::execUPGRADE(Signal* signal){ + Uint32 gsn = signal->header.theVerId_signalNumber; + switch(gsn){ + case UpgradeStartup::GSN_CM_APPCHG: + UpgradeStartup::execCM_APPCHG(* this, signal); + break; + case UpgradeStartup::GSN_CNTR_MASTERREF: + UpgradeStartup::execCNTR_MASTER_REPLY(* this, signal); + break; + case UpgradeStartup::GSN_CNTR_MASTERCONF: + UpgradeStartup::execCNTR_MASTER_REPLY(* this, signal); + break; + } +} diff --git a/ndb/src/kernel/vm/SimulatedBlock.hpp b/ndb/src/kernel/vm/SimulatedBlock.hpp index 42b8a3034f5..491d432625e 100644 --- a/ndb/src/kernel/vm/SimulatedBlock.hpp +++ b/ndb/src/kernel/vm/SimulatedBlock.hpp @@ -45,6 +45,16 @@ #include "SafeCounter.hpp" #include "MetaData.hpp" +#include <mgmapi.h> +#include <mgmapi_config_parameters.h> +#include <mgmapi_config_parameters_debug.h> +#include <kernel_config_parameters.h> +#include <Configuration.hpp> + +#include <signaldata/ReadConfig.hpp> +#include <signaldata/UpgradeStartup.hpp> + + /** * Something for filesystem access */ @@ -70,6 +80,7 @@ class SimulatedBlock { friend class MutexManager; friend class SafeCounter; friend class SafeCounterManager; + friend struct UpgradeStartup; public: friend class BlockComponent; virtual ~SimulatedBlock(); @@ -378,7 +389,7 @@ private: void execSIGNAL_DROPPED_REP(Signal* signal); void execCONTINUE_FRAGMENTED(Signal* signal); - + Uint32 c_fragmentIdCounter; ArrayPool<FragmentInfo> c_fragmentInfoPool; DLHashTable<FragmentInfo> c_fragmentInfoHash; @@ -404,7 +415,9 @@ private: void execUTIL_UNLOCK_REF(Signal* signal); void execUTIL_UNLOCK_CONF(Signal* signal); + void execREAD_CONFIG_REQ(Signal* signal); protected: + void execUPGRADE(Signal* signal); // Variable for storing inserted errors, see pc.H ERROR_INSERT_VARIABLE; diff --git a/ndb/src/kernel/vm/TransporterCallback.cpp b/ndb/src/kernel/vm/TransporterCallback.cpp index 3798e4040c8..eb7d138895c 100644 --- a/ndb/src/kernel/vm/TransporterCallback.cpp +++ b/ndb/src/kernel/vm/TransporterCallback.cpp @@ -206,6 +206,7 @@ execute(void * callbackObj, LinearSectionPtr ptr[3]){ const Uint32 secCount = header->m_noOfSections; + const Uint32 length = header->theLength; #ifdef TRACE_DISTRIBUTED ndbout_c("recv: %s(%d) from (%s, %d)", @@ -225,6 +226,11 @@ execute(void * callbackObj, case 1: ok &= import(secPtr[0], ptr[0].p, ptr[0].sz); } + + /** + * Check that we haven't received a too long signal + */ + ok &= (length + secCount <= 25); Uint32 secPtrI[3]; if(ok){ @@ -234,6 +240,7 @@ execute(void * callbackObj, secPtrI[0] = secPtr[0].i; secPtrI[1] = secPtr[1].i; secPtrI[2] = secPtr[2].i; + globalScheduler.execute(header, prio, theData, secPtrI); return; } diff --git a/ndb/src/kernel/vm/VMSignal.hpp b/ndb/src/kernel/vm/VMSignal.hpp index 45e731f2079..9111ee7949c 100644 --- a/ndb/src/kernel/vm/VMSignal.hpp +++ b/ndb/src/kernel/vm/VMSignal.hpp @@ -34,6 +34,7 @@ struct NodeReceiverGroup { NodeReceiverGroup(); NodeReceiverGroup(Uint32 blockRef); NodeReceiverGroup(Uint32 blockNo, const NodeBitmask &); + NodeReceiverGroup(Uint32 blockNo, const class SignalCounter &); NodeReceiverGroup& operator=(BlockReference ref); @@ -171,6 +172,14 @@ NodeReceiverGroup::NodeReceiverGroup(Uint32 blockNo, const NodeBitmask & nodes){ m_nodes = nodes; } +#include "SignalCounter.hpp" + +inline +NodeReceiverGroup::NodeReceiverGroup(Uint32 blockNo, const SignalCounter & nodes){ + m_block = blockNo; + m_nodes = nodes.m_nodes; +} + inline NodeReceiverGroup& NodeReceiverGroup::operator=(BlockReference blockRef){ diff --git a/ndb/src/kernel/vm/pc.hpp b/ndb/src/kernel/vm/pc.hpp index 873a986bc35..bc74adfc8f6 100644 --- a/ndb/src/kernel/vm/pc.hpp +++ b/ndb/src/kernel/vm/pc.hpp @@ -22,8 +22,14 @@ #include <NdbOut.hpp> #include <ndb_limits.h> -#ifdef USE_EMULATED_JAM +#ifdef NO_EMULATED_JAM +#define jam() +#define jamLine(line) +#define jamEntry() +#define jamEntryLine(line) + +#else #ifdef NDB_WIN32 #define jam() { \ @@ -72,11 +78,6 @@ #endif -#else -#define jam() -#define jamLine(line) -#define jamEntry() -#define jamEntryLine(line) #endif #ifndef NDB_OPT #define ptrCheck(ptr, limit, rec) if (ptr.i < (limit)) ptr.p = &rec[ptr.i]; else ptr.p = NULL @@ -116,12 +117,6 @@ #define arrGuard(ind, size) #endif -// ------- EVENT STATES OF A NODE ----------------------------- -#define ZADD 0 /* New application added */ -#define ZREMOVE 1 /* An application has been removed */ -#define ZSTART 2 /* An application is ready to start */ -#define ZRUN 3 /* An application has started to run */ - // -------- ERROR INSERT MACROS ------- #ifdef ERROR_INSERT #define ERROR_INSERT_VARIABLE UintR cerrorInsert @@ -190,7 +185,7 @@ * * NOTE these may only be used within blocks */ -#if defined VM_TRACE || defined NDB_DEBUG +#if defined VM_TRACE #define ndbassert(check) \ if((check)){ \ } else { \ diff --git a/ndb/src/kernel/vm/testLongSig/testLongSig.cpp b/ndb/src/kernel/vm/testLongSig/testLongSig.cpp index af4e2ca6e24..1d1fb8ebc82 100644 --- a/ndb/src/kernel/vm/testLongSig/testLongSig.cpp +++ b/ndb/src/kernel/vm/testLongSig/testLongSig.cpp @@ -39,6 +39,7 @@ print_help(){ ndbout << "11 - Sending of CONTINUEB fragmented signals w/ linear sections" << endl; ndbout << "12 - As but using receiver group" << endl; + ndbout << "13 - Send 100 * 1000 25 len signals wo/ sections" << endl; ndbout << "r - Recive signal from anyone" << endl; ndbout << "a - Run tests 1 - 12 with variable sizes - 10 loops" << endl; ndbout << "b - Run tests 1 - 12 with variable sizes - 100 loops" << endl; @@ -103,7 +104,7 @@ main(void){ data[5] = 70; data[6] = 123; data[7] = 10; - const Uint32 theDataLen = 8; + const Uint32 theDataLen = 18; for(Uint32 i = 0; i<70; i++) sec0[i] = i; @@ -198,6 +199,38 @@ main(void){ delete ret1; count--; } + } else if (data[1] == 13) { + const Uint32 count = 3500; + const Uint32 loop = 1000; + + signal1.set(ss, 0, CMVMI, GSN_TESTSIG, 25); + signal1.header.m_fragmentInfo = 0; + signal1.header.m_noOfSections = 0; + signal1.theData[1] = 14; + signal1.theData[3] = 0; // Print + signal1.theData[8] = count; + signal1.theData[9] = loop; + Uint32 nodeId = ss.getAliveNode(); + ndbout_c("Sending 25 len signal to node %d", nodeId); + ss.sendSignal(nodeId, &signal1); + + Uint32 total; + { + SimpleSignal * ret1 = ss.waitFor((Uint16)nodeId); + ndbout_c("received from node %d", + refToNode(ret1->header.theSendersBlockRef)); + total = ret1->theData[10] - 1; + delete ret1; + } + + do { + ndbout << "Waiting for " << total << " signals... " << flush; + SimpleSignal * ret1 = ss.waitFor((Uint16)nodeId); + ndbout_c("received from node %d", + refToNode(ret1->header.theSendersBlockRef)); + delete ret1; + total --; + } while(total > 0); } else { print_help(); } @@ -218,7 +251,6 @@ runTest(SignalSender & ss, Uint32 count, bool verbose){ sec2[i] = i * i; } - sig.set(ss, 0, CMVMI, GSN_TESTSIG, 8); sig.theData[0] = ss.getOwnRef(); sig.theData[1] = 1; // TestType sig.theData[2] = 128; // FragSize @@ -236,6 +268,8 @@ runTest(SignalSender & ss, Uint32 count, bool verbose){ sig.ptr[1].sz = randRange(1, 256); sig.ptr[2].sz = randRange(1, 256); sig.header.m_noOfSections = secs; + const Uint32 len = 5 + (secs > 0 ? 1 : 0) * (25 - 5 - 7); + sig.set(ss, 0, CMVMI, GSN_TESTSIG, len); ndbout << "Loop " << loop << " #secs = " << secs << " sizes = [ "; unsigned min = 256; unsigned max = 0; @@ -248,7 +282,7 @@ runTest(SignalSender & ss, Uint32 count, bool verbose){ sum += sz; sig.theData[5+i] = sz; } - ndbout_c("]"); + ndbout_c("] len = %d", len); for(int test = 1; test <= 12; test++){ sig.theData[1] = test; Uint32 nodeId = ss.getAliveNode(); |