diff options
Diffstat (limited to 'ndb/src/mgmsrv/MgmtSrvr.cpp')
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 927 |
1 files changed, 296 insertions, 631 deletions
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 8380f3fd86a..92a8025295f 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -15,7 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <ndb_global.h> -#include <pthread.h> +#include <my_pthread.h> #include "MgmtSrvr.hpp" #include "MgmtErrorReporter.hpp" @@ -45,7 +45,6 @@ #include <ndb_version.h> #include <SocketServer.hpp> -#include "NodeLogLevel.hpp" #include <NdbConfig.h> #include <NdbAutoPtr.hpp> @@ -62,71 +61,16 @@ #endif extern int global_flag_send_heartbeat_now; - -static -void -CmdBackupCallback(const MgmtSrvr::BackupEvent & event) -{ - char str[255]; - - ndbout << endl; - - bool ok = false; - switch(event.Event){ - case MgmtSrvr::BackupEvent::BackupStarted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d started", event.Started.BackupId); - break; - case MgmtSrvr::BackupEvent::BackupFailedToStart: - ok = true; - snprintf(str, sizeof(str), - "Backup failed to start (Error %d)", - event.FailedToStart.ErrorCode); - break; - case MgmtSrvr::BackupEvent::BackupCompleted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d completed", - event.Completed.BackupId); - ndbout << str << endl; - - snprintf(str, sizeof(str), - " StartGCP: %d StopGCP: %d", - event.Completed.startGCP, event.Completed.stopGCP); - ndbout << str << endl; - - snprintf(str, sizeof(str), - " #Records: %d #LogRecords: %d", - event.Completed.NoOfRecords, event.Completed.NoOfLogRecords); - ndbout << str << endl; - - snprintf(str, sizeof(str), - " Data: %d bytes Log: %d bytes", - event.Completed.NoOfBytes, event.Completed.NoOfLogBytes); - break; - case MgmtSrvr::BackupEvent::BackupAborted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d has been aborted reason %d", - event.Aborted.BackupId, - event.Aborted.Reason); - break; - } - if(!ok){ - snprintf(str, sizeof(str), "Unknown backup event: %d", event.Event); - } - ndbout << str << endl; -} - +extern int g_no_nodeid_checks; void * MgmtSrvr::logLevelThread_C(void* m) { MgmtSrvr *mgm = (MgmtSrvr*)m; - + my_thread_init(); mgm->logLevelThreadRun(); + my_thread_end(); NdbThread_Exit(0); /* NOTREACHED */ return 0; @@ -136,9 +80,10 @@ void * MgmtSrvr::signalRecvThread_C(void *m) { MgmtSrvr *mgm = (MgmtSrvr*)m; - + my_thread_init(); mgm->signalRecvThreadRun(); + my_thread_end(); NdbThread_Exit(0); /* NOTREACHED */ return 0; @@ -188,44 +133,65 @@ MgmtSrvr::signalRecvThreadRun() EventLogger g_EventLogger; +static NdbOut& +operator<<(NdbOut& out, const LogLevel & ll) +{ + out << "[LogLevel: "; + for(size_t i = 0; i<LogLevel::LOGLEVEL_CATEGORIES; i++) + out << ll.getLogLevel((LogLevel::EventCategory)i) << " "; + out << "]"; + return out; +} + void MgmtSrvr::logLevelThreadRun() { - NdbMutex* threadMutex = NdbMutex_Create(); - while (!_isStopThread) { - if (_startedNodeId != 0) { - NdbMutex_Lock(threadMutex); - - // Local node - NodeLogLevel* n = NULL; - while ((n = _nodeLogLevelList->next()) != NULL) { - if (n->getNodeId() == _startedNodeId) { - setNodeLogLevel(_startedNodeId, n->getLogLevelOrd(), true); - } - } - // Cluster log - while ((n = _clusterLogLevelList->next()) != NULL) { - if (n->getNodeId() == _startedNodeId) { - setEventReportingLevel(_startedNodeId, n->getLogLevelOrd(), true); - } - } - _startedNodeId = 0; - - NdbMutex_Unlock(threadMutex); + /** + * Handle started nodes + */ + EventSubscribeReq req; + req = m_statisticsListner.m_clients[0].m_logLevel; + req.blockRef = _ownReference; - } // if (_startedNodeId != 0) { + SetLogLevelOrd ord; + + m_started_nodes.lock(); + while(m_started_nodes.size() > 0){ + Uint32 node = m_started_nodes[0]; + m_started_nodes.erase(0, false); + m_started_nodes.unlock(); + setEventReportingLevelImpl(node, req); + + ord = m_nodeLogLevel[node]; + setNodeLogLevelImpl(node, ord); + + m_started_nodes.lock(); + } + m_started_nodes.unlock(); + + m_log_level_requests.lock(); + while(m_log_level_requests.size() > 0){ + req = m_log_level_requests[0]; + m_log_level_requests.erase(0, false); + m_log_level_requests.unlock(); + + LogLevel tmp; + tmp = req; + + if(req.blockRef == 0){ + req.blockRef = _ownReference; + setEventReportingLevelImpl(0, req); + } else { + ord = req; + setNodeLogLevelImpl(req.blockRef, ord); + } + m_log_level_requests.lock(); + } + m_log_level_requests.unlock(); NdbSleep_MilliSleep(_logLevelThreadSleep); - } // while (!_isStopThread) - - NdbMutex_Destroy(threadMutex); -} - -void -MgmtSrvr::setStatisticsListner(StatisticsListner* listner) -{ - m_statisticsListner = listner; + } } void @@ -272,7 +238,7 @@ class ErrorItem { public: int _errorCode; - const BaseString _errorText; + const char * _errorText; }; bool @@ -429,110 +395,37 @@ MgmtSrvr::getPort() const { ndb_mgm_destroy_iterator(iter); - /***************** - * Set Stat Port * - *****************/ -#if 0 - if (!mgmProps->get("PortNumberStats", &tmp)){ - ndbout << "Could not find PortNumberStats in the configuration file." - << endl; - return false; - } - glob.port_stats = tmp; -#endif - -#if 0 - const char * host; - if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){ - ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - - const char * hostname; - { - const Properties * p; - char buf[255]; - snprintf(buf, sizeof(buf), "Computer_%s", host.c_str()); - if(!glob.cluster_config->get(buf, &p)){ - ndbout << "Failed to find computer " << host << " in config" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - if(!p->get("HostName", &hostname)){ - ndbout << "Failed to find \"HostName\" for computer " << host - << " in config" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - if(NdbHost_GetHostName(buf) != 0){ - ndbout << "Unable to get own hostname" << endl; - ndbout << "Unable to verify own hostname" << endl; - return false; - } - } - - const char * ip_address; - if(mgmProps->get("IpAddress", &ip_address)){ - glob.use_specific_ip = true; - glob.interface_name = strdup(ip_address); - return true; - } - - glob.interface_name = strdup(hostname); -#endif - return port; } -int -MgmtSrvr::getStatPort() const { -#if 0 - const Properties *mgmProps; - if(!getConfig()->get("Node", _ownNodeId, &mgmProps)) - return -1; - - int tmp = -1; - if(!mgmProps->get("PortNumberStats", (Uint32 *)&tmp)) - return -1; - - return tmp; -#else - return -1; -#endif -} - /* Constructor */ MgmtSrvr::MgmtSrvr(NodeId nodeId, const BaseString &configFilename, - const BaseString &ndb_config_filename, + LocalConfig &local_config, Config * config): _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. _ownReference(0), + m_allocated_resources(*this), theSignalIdleList(NULL), theWaitState(WAIT_SUBSCRIBE_CONF), - theConfCount(0), - m_allocated_resources(*this) { - + m_statisticsListner(this), + m_local_config(local_config) +{ + DBUG_ENTER("MgmtSrvr::MgmtSrvr"); _config = NULL; - _isStatPortActive = false; - _isClusterLogStatActive = false; _isStopThread = false; _logLevelThread = NULL; _logLevelThreadSleep = 500; m_signalRecvThread = NULL; - _startedNodeId = 0; theFacade = 0; m_newConfig = NULL; m_configFilename = configFilename; - setCallback(CmdBackupCallback); - m_localNdbConfigFilename = ndb_config_filename; m_nextConfigGenerationNumber = 0; @@ -583,33 +476,64 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, ndb_mgm_destroy_iterator(iter); } - m_statisticsListner = NULL; - - _nodeLogLevelList = new NodeLogLevelList(); - _clusterLogLevelList = new NodeLogLevelList(); - _props = NULL; - _ownNodeId= 0; NodeId tmp= nodeId; BaseString error_string; - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, 0, 0, error_string)){ +#if 0 + char my_hostname[256]; + struct sockaddr_in tmp_addr; + SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr); + if (!g_no_nodeid_checks) { + if (gethostname(my_hostname, sizeof(my_hostname))) { + ndbout << "error: gethostname() - " << strerror(errno) << endl; + exit(-1); + } + if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) { + ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - " + << strerror(errno) << endl; + exit(-1); + } + } + if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, + (struct sockaddr *)&tmp_addr, + &addrlen, error_string)){ ndbout << "Unable to obtain requested nodeid: " << error_string.c_str() << endl; exit(-1); } +#else + if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, + 0, 0, error_string)){ + ndbout << "Unable to obtain requested nodeid: " + << error_string.c_str() << endl; + exit(-1); + } +#endif _ownNodeId = tmp; { DBUG_PRINT("info", ("verifyConfig")); - ConfigRetriever cr(NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); + ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) { ndbout << cr.getErrorString() << endl; exit(-1); } } + { + MgmStatService::StatListener se; + se.m_socket = -1; + for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){ + se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7); + } + se.m_logLevel.setLogLevel(LogLevel::llError, 15); + se.m_logLevel.setLogLevel(LogLevel::llBackup, 15); + m_statisticsListner.m_clients.push_back(se); + m_statisticsListner.m_logLevel = se.m_logLevel; + } + DBUG_VOID_RETURN; } @@ -671,8 +595,6 @@ MgmtSrvr::start(BaseString &error_string) // Set the initial confirmation count for subscribe requests confirm // from NDB nodes in the cluster. // - theConfCount = getNodeCount(NDB_MGM_NODE_TYPE_NDB); - // Loglevel thread _logLevelThread = NdbThread_Create(logLevelThread_C, (void**)this, @@ -713,9 +635,6 @@ MgmtSrvr::~MgmtSrvr() if(_config != NULL) delete _config; - delete _nodeLogLevelList; - delete _clusterLogLevelList; - // End set log level thread void* res = 0; _isStopThread = true; @@ -736,6 +655,9 @@ MgmtSrvr::~MgmtSrvr() int MgmtSrvr::okToSendTo(NodeId processId, bool unCond) { + if(processId == 0) + return 0; + if (getNodeType(processId) != NDB_MGM_NODE_TYPE_NDB) return WRONG_PROCESS_TYPE; @@ -1020,36 +942,38 @@ int MgmtSrvr::versionNode(int processId, bool abort, VersionCallback callback, void * anyData) { + int version; + if(m_versionRec.inUse) return OPERATION_IN_PROGRESS; m_versionRec.callback = callback; m_versionRec.inUse = true ; - ClusterMgr::Node node; - int version; - if (getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) { - if(m_versionRec.callback != 0) - m_versionRec.callback(processId, NDB_VERSION, this,0); - } - if (getNodeType(processId) == NDB_MGM_NODE_TYPE_NDB) { - node = theFacade->theClusterMgr->getNodeInfo(processId); - version = node.m_info.m_version; - if(theFacade->theClusterMgr->getNodeInfo(processId).connected) - if(m_versionRec.callback != 0) - m_versionRec.callback(processId, version, this,0); - else - if(m_versionRec.callback != 0) - m_versionRec.callback(processId, 0, this,0); - + if (getOwnNodeId() == processId) + { + version= NDB_VERSION; } - - if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API) { + else if (getNodeType(processId) == NDB_MGM_NODE_TYPE_NDB) + { + ClusterMgr::Node node= theFacade->theClusterMgr->getNodeInfo(processId); + if(node.connected) + version= node.m_info.m_version; + else + version= 0; + } + else if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API || + getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) + { return sendVersionReq(processId); } + if(m_versionRec.callback != 0) + m_versionRec.callback(processId, version, this,0); m_versionRec.inUse = false ; - return 0; + m_versionRec.version[processId]= version; + + return 0; } int @@ -1460,17 +1384,14 @@ MgmtSrvr::status(int processId, Uint32 * nodegroup, Uint32 * connectCount) { - if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API) { + if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API || + getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) { if(versionNode(processId, false,0,0) ==0) * version = m_versionRec.version[processId]; else * version = 0; } - if (getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) { - * version = NDB_VERSION; - } - const ClusterMgr::Node node = theFacade->theClusterMgr->getNodeInfo(processId); @@ -1540,175 +1461,72 @@ MgmtSrvr::status(int processId, return -1; } - - -//**************************************************************************** -//**************************************************************************** -int -MgmtSrvr::startStatisticEventReporting(int level) -{ - SetLogLevelOrd ll; - NodeId nodeId = 0; - - ll.clear(); - ll.setLogLevel(LogLevel::llStatistic, level); - - if (level > 0) { - _isStatPortActive = true; - } else { - _isStatPortActive = false; - - if (_isClusterLogStatActive) { - return 0; - } - } - - while (getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) { - setEventReportingLevelImpl(nodeId, ll); - } - - return 0; -} - -int -MgmtSrvr::setEventReportingLevel(int processId, const SetLogLevelOrd & ll, - bool isResend) -{ - for (Uint32 i = 0; i < ll.noOfEntries; i++) { - if (ll.theCategories[i] == LogLevel::llStatistic) { - if (ll.theLevels[i] > 0) { - _isClusterLogStatActive = true; - break; - } else { - _isClusterLogStatActive = false; - - if (_isStatPortActive) { - return 0; - } - break; - } - } // if (ll.theCategories - } // for (int i = 0 - - return setEventReportingLevelImpl(processId, ll, isResend); -} int MgmtSrvr::setEventReportingLevelImpl(int processId, - const SetLogLevelOrd & ll, - bool isResend) + const EventSubscribeReq& ll) { - Uint32 i; - for(i = 0; i<ll.noOfEntries; i++){ - // Save log level for the cluster log - if (!isResend) { - NodeLogLevel* n = NULL; - bool found = false; - while ((n = _clusterLogLevelList->next()) != NULL) { - if (n->getNodeId() == processId && - n->getCategory() == ll.theCategories[i]) { - - n->setLevel(ll.theLevels[i]); - found = true; - } - } - if (!found) { - _clusterLogLevelList->add(new NodeLogLevel(processId, ll)); - } - } - } - + int result = okToSendTo(processId, true); if (result != 0) { return result; } - NdbApiSignal* signal = getSignal(); - if (signal == NULL) { - return COULD_NOT_ALLOCATE_MEMORY; - } + NdbApiSignal signal(_ownReference); EventSubscribeReq * dst = - CAST_PTR(EventSubscribeReq, signal->getDataPtrSend()); - for(i = 0; i<ll.noOfEntries; i++){ - dst->theCategories[i] = ll.theCategories[i]; - dst->theLevels[i] = ll.theLevels[i]; - } - - dst->noOfEntries = ll.noOfEntries; - dst->blockRef = _ownReference; + CAST_PTR(EventSubscribeReq, signal.getDataPtrSend()); - signal->set(TestOrd::TraceAPI, CMVMI, GSN_EVENT_SUBSCRIBE_REQ, - EventSubscribeReq::SignalLength); + * dst = ll; + + signal.set(TestOrd::TraceAPI, CMVMI, GSN_EVENT_SUBSCRIBE_REQ, + EventSubscribeReq::SignalLength); + + theFacade->lock_mutex(); + send(&signal, processId, NODE_TYPE_DB); + theFacade->unlock_mutex(); - result = sendSignal(processId, WAIT_SUBSCRIBE_CONF, signal, true); - if (result == -1) { - return SEND_OR_RECEIVE_FAILED; - } - else { - // Increment the conf counter - theConfCount++; - } - return 0; } //**************************************************************************** //**************************************************************************** int -MgmtSrvr::setNodeLogLevel(int processId, const SetLogLevelOrd & ll, - bool isResend) +MgmtSrvr::setNodeLogLevelImpl(int processId, const SetLogLevelOrd & ll) { - Uint32 i; - for(i = 0; i<ll.noOfEntries; i++){ - // Save log level for the cluster log - if (!isResend) { - NodeLogLevel* n = NULL; - bool found = false; - while ((n = _clusterLogLevelList->next()) != NULL) { - if (n->getNodeId() == processId && - n->getCategory() == ll.theCategories[i]) { - - n->setLevel(ll.theLevels[i]); - found = true; - } - } - if (!found) { - _clusterLogLevelList->add(new NodeLogLevel(processId, ll)); - } - } - } - int result = okToSendTo(processId, true); if (result != 0) { return result; } - NdbApiSignal* signal = getSignal(); - if (signal == NULL) { - return COULD_NOT_ALLOCATE_MEMORY; - } - - SetLogLevelOrd * dst = CAST_PTR(SetLogLevelOrd, signal->getDataPtrSend()); - - for(i = 0; i<ll.noOfEntries; i++){ - dst->theCategories[i] = ll.theCategories[i]; - dst->theLevels[i] = ll.theLevels[i]; - } + NdbApiSignal signal(_ownReference); - dst->noOfEntries = ll.noOfEntries; + SetLogLevelOrd * dst = CAST_PTR(SetLogLevelOrd, signal.getDataPtrSend()); - signal->set(TestOrd::TraceAPI, CMVMI, GSN_SET_LOGLEVELORD, - SetLogLevelOrd::SignalLength); - - result = sendSignal(processId, NO_WAIT, signal, true); - if (result == -1) { - return SEND_OR_RECEIVE_FAILED; - } + * dst = ll; + + signal.set(TestOrd::TraceAPI, CMVMI, GSN_SET_LOGLEVELORD, + SetLogLevelOrd::SignalLength); + + theFacade->lock_mutex(); + theFacade->sendSignalUnCond(&signal, processId); + theFacade->unlock_mutex(); return 0; } +int +MgmtSrvr::send(NdbApiSignal* signal, Uint32 node, Uint32 node_type){ + Uint32 max = (node == 0) ? MAX_NODES : node + 1; + + for(; node < max; node++){ + while(nodeTypes[node] != (int)node_type && node < max) node++; + if(nodeTypes[node] != (int)node_type) + break; + theFacade->sendSignalUnCond(signal, node); + } + return 0; +} //**************************************************************************** //**************************************************************************** @@ -2003,7 +1821,7 @@ const char* MgmtSrvr::getErrorText(int errorCode) for (int i = 0; i < noOfErrorCodes; ++i) { if (errorCode == errorTable[i]._errorCode) { - return errorTable[i]._errorText.c_str(); + return errorTable[i]._errorText; } } @@ -2011,21 +1829,6 @@ const char* MgmtSrvr::getErrorText(int errorCode) return text; } -/***************************************************************************** - * Handle reception of various signals - *****************************************************************************/ - -int -MgmtSrvr::handleSTATISTICS_CONF(NdbApiSignal* signal) -{ - //ndbout << "MgmtSrvr::handleSTATISTICS_CONF" << endl; - - int x = signal->readData(1); - //ndbout << "MgmtSrvr::handleSTATISTICS_CONF, x: " << x << endl; - _statistics._test1 = x; - return 0; -} - void MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal) { @@ -2049,51 +1852,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal) } break; - case GSN_STATISTICS_CONF: - if (theWaitState != WAIT_STATISTICS) { - g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected " - "signal received, gsn %d, theWaitState = %d", - gsn, theWaitState); - - return; - } - returnCode = handleSTATISTICS_CONF(signal); - if (returnCode != -1) { - theWaitState = NO_WAIT; - } - break; - - - case GSN_SET_VAR_CONF: - if (theWaitState != WAIT_SET_VAR) { - g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected " - "signal received, gsn %d, theWaitState = %d", - gsn, theWaitState); - return; - } - theWaitState = NO_WAIT; - _setVarReqResult = 0; - break; - - case GSN_SET_VAR_REF: - if (theWaitState != WAIT_SET_VAR) { - g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected " - "signal received, gsn %d, theWaitState = %d", - gsn, theWaitState); - return; - } - theWaitState = NO_WAIT; - _setVarReqResult = -1; - break; - case GSN_EVENT_SUBSCRIBE_CONF: - theConfCount--; // OK, we've received a conf message - if (theConfCount < 0) { - g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected " - "signal received, gsn %d, theWaitState = %d", - gsn, theWaitState); - theConfCount = 0; - } break; case GSN_EVENT_REP: @@ -2173,7 +1932,6 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal) event.Completed.NoOfLogBytes = rep->noOfLogBytes; event.Completed.NoOfRecords = rep->noOfRecords; event.Completed.NoOfLogRecords = rep->noOfLogRecords; - event.Completed.stopGCP = rep->stopGCP; event.Completed.startGCP = rep->startGCP; event.Nodes = rep->nodes; @@ -2276,20 +2034,19 @@ void MgmtSrvr::handleStatus(NodeId nodeId, bool alive) { if (alive) { - _startedNodeId = nodeId; // Used by logLevelThreadRun() + m_started_nodes.push_back(nodeId); Uint32 theData[25]; theData[0] = EventReport::Connected; theData[1] = nodeId; + eventReport(_ownNodeId, theData); } else { handleStopReply(nodeId, 0); - theConfCount++; // Increment the event subscr conf count because - + Uint32 theData[25]; theData[0] = EventReport::Disconnected; theData[1] = nodeId; - + eventReport(_ownNodeId, theData); - g_EventLogger.info("Lost connection to node %d", nodeId); } } @@ -2337,32 +2094,42 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, SOCKET_SIZE_TYPE *client_addr_len, BaseString &error_string) { + DBUG_ENTER("MgmtSrvr::alloc_node_id"); + DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d", + *nodeId, type, client_addr)); + if (g_no_nodeid_checks) { + if (*nodeId == 0) { + error_string.appfmt("no-nodeid-ckecks set in manegment server.\n" + "node id must be set explicitly in connectstring"); + DBUG_RETURN(false); + } + DBUG_RETURN(true); + } Guard g(&f_node_id_mutex); -#if 0 - ndbout << "MgmtSrvr::getFreeNodeId type=" << type - << " *nodeid=" << *nodeId << endl; -#endif - + int no_mgm= 0; NodeBitmask connected_nodes(m_reserved_nodes); - if (theFacade && theFacade->theClusterMgr) { - for(Uint32 i = 0; i < MAX_NODES; i++) - if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) { - const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); - if (node.connected) - connected_nodes.bitOR(node.m_state.m_connected_nodes); + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB && + theFacade && theFacade->theClusterMgr) { + const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); + if (node.connected) { + connected_nodes.bitOR(node.m_state.m_connected_nodes); } + } else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) + no_mgm++; } - bool found_matching_id= false; bool found_matching_type= false; bool found_free_node= false; - const char *config_hostname = 0; + unsigned id_found= 0; + const char *config_hostname= 0; struct in_addr config_addr= {0}; int r_config_addr= -1; unsigned type_c= 0; - ndb_mgm_configuration_iterator iter(*(ndb_mgm_configuration *)_config->m_configValues, - CFG_SECTION_NODE); + ndb_mgm_configuration_iterator + iter(*(ndb_mgm_configuration *)_config->m_configValues, CFG_SECTION_NODE); for(iter.first(); iter.valid(); iter.next()) { unsigned tmp= 0; if(iter.get(CFG_NODE_ID, &tmp)) abort(); @@ -2370,15 +2137,16 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, continue; found_matching_id= true; if(iter.get(CFG_TYPE_OF_SECTION, &type_c)) abort(); - if(type_c != type) + if(type_c != (unsigned)type) continue; found_matching_type= true; if (connected_nodes.get(tmp)) continue; found_free_node= true; if(iter.get(CFG_NODE_HOST, &config_hostname)) abort(); - - if (config_hostname && config_hostname[0] != 0 && client_addr) { + if (config_hostname && config_hostname[0] == 0) + config_hostname= 0; + else if (client_addr) { // check hostname compatability const void *tmp_in= &(((sockaddr_in*)client_addr)->sin_addr); if((r_config_addr= Ndb_getInAddr(&config_addr, config_hostname)) != 0 @@ -2388,39 +2156,76 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, || memcmp(&tmp_addr, tmp_in, sizeof(config_addr)) != 0) { // not localhost #if 0 - ndbout << "MgmtSrvr::getFreeNodeId compare failed for \"" << config_hostname - << "\" id=" << tmp << endl; + ndbout << "MgmtSrvr::getFreeNodeId compare failed for \"" + << config_hostname + << "\" id=" << tmp << endl; #endif continue; } // connecting through localhost - // check if config_hostname match hostname - char my_hostname[256]; - if (gethostname(my_hostname, sizeof(my_hostname)) != 0) - continue; - if(Ndb_getInAddr(&tmp_addr, my_hostname) != 0 - || memcmp(&tmp_addr, &config_addr, sizeof(config_addr)) != 0) { - // no match + // check if config_hostname is local + if (!SocketServer::tryBind(0,config_hostname)) { continue; } } + } else { // client_addr == 0 + if (!SocketServer::tryBind(0,config_hostname)) { + continue; + } } - *nodeId= tmp; - if (client_addr) - m_connect_address[tmp]= ((struct sockaddr_in *)client_addr)->sin_addr; - else - Ndb_getInAddr(&(m_connect_address[tmp]), "localhost"); - m_reserved_nodes.set(tmp); -#if 0 - ndbout << "MgmtSrvr::getFreeNodeId found type=" << type - << " *nodeid=" << *nodeId << endl; -#endif - return true; + if (*nodeId != 0 || + type != NDB_MGM_NODE_TYPE_MGM || + no_mgm == 1) { // any match is ok + id_found= tmp; + break; + } + if (id_found) { // mgmt server may only have one match + error_string.appfmt("Ambiguous node id's %d and %d.\n" + "Suggest specifying node id in connectstring,\n" + "or specifying unique host names in config file.", + id_found, tmp); + DBUG_RETURN(false); + } + if (config_hostname == 0) { + error_string.appfmt("Ambiguity for node id %d.\n" + "Suggest specifying node id in connectstring,\n" + "or specifying unique host names in config file,\n" + "or specifying just one mgmt server in config file.", + tmp); + DBUG_RETURN(false); + } + id_found= tmp; // mgmt server matched, check for more matches + } + + if (id_found) + { + *nodeId= id_found; + DBUG_PRINT("info", ("allocating node id %d",*nodeId)); + { + int r= 0; + if (client_addr) + m_connect_address[id_found]= + ((struct sockaddr_in *)client_addr)->sin_addr; + else if (config_hostname) + r= Ndb_getInAddr(&(m_connect_address[id_found]), config_hostname); + else { + char name[256]; + r= gethostname(name, sizeof(name)); + if (r == 0) { + name[sizeof(name)-1]= 0; + r= Ndb_getInAddr(&(m_connect_address[id_found]), name); + } + } + if (r) + m_connect_address[id_found].s_addr= 0; + } + m_reserved_nodes.set(id_found); + DBUG_RETURN(true); } if (found_matching_type && !found_free_node) { - // we have a temporary error which might be due to that we have got the latest - // connect status from db-nodes. Force update. + // we have a temporary error which might be due to that + // we have got the latest connect status from db-nodes. Force update. global_flag_send_heartbeat_now= 1; } @@ -2429,7 +2234,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, const char *alias, *str; alias= ndb_mgm_get_node_type_alias_string(type, &str); type_string.assfmt("%s(%s)", alias, str); - alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c, &str); + alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c, + &str); type_c_string.assfmt("%s(%s)", alias, str); } @@ -2438,11 +2244,14 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_matching_type) if (found_free_node) error_string.appfmt("Connection done from wrong host ip %s.", - inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr)); + inet_ntoa(((struct sockaddr_in *) + (client_addr))->sin_addr)); else - error_string.appfmt("No free node id found for %s.", type_string.c_str()); + error_string.appfmt("No free node id found for %s.", + type_string.c_str()); else - error_string.appfmt("No %s node defined in config file.", type_string.c_str()); + error_string.appfmt("No %s node defined in config file.", + type_string.c_str()); else error_string.append("No nodes defined in config file."); } else { @@ -2451,19 +2260,23 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_free_node) { // have to split these into two since inet_ntoa overwrites itself error_string.appfmt("Connection with id %d done from wrong host ip %s,", - *nodeId, inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr)); + *nodeId, inet_ntoa(((struct sockaddr_in *) + (client_addr))->sin_addr)); error_string.appfmt(" expected %s(%s).", config_hostname, - r_config_addr ? "lookup failed" : inet_ntoa(config_addr)); + r_config_addr ? + "lookup failed" : inet_ntoa(config_addr)); } else - error_string.appfmt("Id %d already allocated by another node.", *nodeId); + error_string.appfmt("Id %d already allocated by another node.", + *nodeId); else error_string.appfmt("Id %d configured as %s, connect attempted as %s.", - *nodeId, type_c_string.c_str(), type_string.c_str()); + *nodeId, type_c_string.c_str(), + type_string.c_str()); else - error_string.appfmt("No node defined with id=%d in config file.", *nodeId); + error_string.appfmt("No node defined with id=%d in config file.", + *nodeId); } - - return false; + DBUG_RETURN(false); } bool @@ -2483,91 +2296,23 @@ MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const return true; } +#include "Services.hpp" + void MgmtSrvr::eventReport(NodeId nodeId, const Uint32 * theData) { const EventReport * const eventReport = (EventReport *)&theData[0]; - + EventReport::EventType type = eventReport->getEventType(); - - if (type == EventReport::TransReportCounters || - type == EventReport::OperationReportCounters) { - - if (_isClusterLogStatActive) { - g_EventLogger.log(type, theData, nodeId); - } - - if (_isStatPortActive) { - char theTime[128]; - struct tm* tm_now; - time_t now; - now = time((time_t*)NULL); -#ifdef NDB_WIN32 - tm_now = localtime(&now); -#else - tm_now = gmtime(&now); -#endif - - snprintf(theTime, sizeof(theTime), - STATISTIC_DATE, - tm_now->tm_year + 1900, - tm_now->tm_mon, - tm_now->tm_mday, - tm_now->tm_hour, - tm_now->tm_min, - tm_now->tm_sec); - - char str[255]; - - if (type == EventReport::TransReportCounters) { - snprintf(str, sizeof(str), - STATISTIC_LINE, - theTime, - (int)now, - nodeId, - theData[1], - theData[2], - theData[3], - // theData[4], simple reads - theData[5], - theData[6], - theData[7], - theData[8]); - } else if (type == EventReport::OperationReportCounters) { - snprintf(str, sizeof(str), - OP_STATISTIC_LINE, - theTime, - (int)now, - nodeId, - theData[1]); - } - - if(m_statisticsListner != 0){ - m_statisticsListner->println_statistics(str); - } - } - - return; - - } // if (type == - // Log event - g_EventLogger.log(type, theData, nodeId); - + g_EventLogger.log(type, theData, nodeId, + &m_statisticsListner.m_clients[0].m_logLevel); + m_statisticsListner.log(type, theData, nodeId); } /*************************************************************************** * Backup ***************************************************************************/ - -MgmtSrvr::BackupCallback -MgmtSrvr::setCallback(BackupCallback aCall) -{ - BackupCallback ret = m_backupCallback; - m_backupCallback = aCall; - return ret; -} - int MgmtSrvr::startBackup(Uint32& backupId, bool waitCompleted) { @@ -2674,102 +2419,18 @@ MgmtSrvr::abortBackup(Uint32 backupId) void MgmtSrvr::backupCallback(BackupEvent & event) { - char str[255]; - - bool ok = false; + m_lastBackupEvent = event; switch(event.Event){ - case BackupEvent::BackupStarted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d started", event.Started.BackupId); - break; case BackupEvent::BackupFailedToStart: - ok = true; - snprintf(str, sizeof(str), - "Backup failed to start (Backup error %d)", - event.FailedToStart.ErrorCode); - break; - case BackupEvent::BackupCompleted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d completed", - event.Completed.BackupId); - g_EventLogger.info(str); - - snprintf(str, sizeof(str), - " StartGCP: %d StopGCP: %d", - event.Completed.startGCP, event.Completed.stopGCP); - g_EventLogger.info(str); - - snprintf(str, sizeof(str), - " #Records: %d #LogRecords: %d", - event.Completed.NoOfRecords, event.Completed.NoOfLogRecords); - g_EventLogger.info(str); - - snprintf(str, sizeof(str), - " Data: %d bytes Log: %d bytes", - event.Completed.NoOfBytes, event.Completed.NoOfLogBytes); - break; case BackupEvent::BackupAborted: - ok = true; - snprintf(str, sizeof(str), - "Backup %d has been aborted reason %d", - event.Aborted.BackupId, - event.Aborted.Reason); - break; - } - if(!ok){ - snprintf(str, sizeof(str), - "Unknown backup event: %d", - event.Event); - - } - g_EventLogger.info(str); - - switch (theWaitState){ - case WAIT_BACKUP_STARTED: - switch(event.Event){ - case BackupEvent::BackupStarted: - case BackupEvent::BackupFailedToStart: - m_lastBackupEvent = event; - theWaitState = NO_WAIT; - break; - default: - snprintf(str, sizeof(str), - "Received event %d in unexpected state WAIT_BACKUP_STARTED", - event.Event); - g_EventLogger.info(str); - return; - } - + case BackupEvent::BackupCompleted: + theWaitState = NO_WAIT; break; - case WAIT_BACKUP_COMPLETED: - switch(event.Event){ - case BackupEvent::BackupCompleted: - case BackupEvent::BackupAborted: - case BackupEvent::BackupFailedToStart: - m_lastBackupEvent = event; + case BackupEvent::BackupStarted: + if(theWaitState == WAIT_BACKUP_STARTED) theWaitState = NO_WAIT; - break; - default: - snprintf(str, sizeof(str), - "Received event %d in unexpected state WAIT_BACKUP_COMPLETED", - event.Event); - g_EventLogger.info(str); - return; - } - break; - default: - snprintf(str, sizeof(str), "Received event %d in unexpected state = %d", - event.Event, theWaitState); - g_EventLogger.info(str); - return; - - } - - if(m_backupCallback != 0){ - (* m_backupCallback)(event); } + return; } @@ -2957,15 +2618,15 @@ MgmtSrvr::setDbParameter(int node, int param, const char * value, switch(p_type){ case 0: res = i2.set(param, val_32); - ndbout_c("Updateing node %d param: %d to %d", node, param, val_32); + ndbout_c("Updating node %d param: %d to %d", node, param, val_32); break; case 1: res = i2.set(param, val_64); - ndbout_c("Updateing node %d param: %d to %Ld", node, param, val_32); + ndbout_c("Updating node %d param: %d to %Ld", node, param, val_32); break; case 2: res = i2.set(param, val_char); - ndbout_c("Updateing node %d param: %d to %s", node, param, val_char); + ndbout_c("Updating node %d param: %d to %s", node, param, val_char); break; default: abort(); @@ -2981,3 +2642,7 @@ template class Vector<SigMatch>; #if __SUNPRO_CC != 0x560 template bool SignalQueue::waitFor<SigMatch>(Vector<SigMatch>&, SigMatch*&, NdbApiSignal*&, unsigned); #endif + +template class MutexVector<unsigned short>; +template class MutexVector<MgmStatService::StatListener>; +template class MutexVector<EventSubscribeReq>; |