summaryrefslogtreecommitdiff
path: root/ndb/src/mgmsrv/MgmtSrvr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/mgmsrv/MgmtSrvr.cpp')
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp927
1 files changed, 296 insertions, 631 deletions
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 8380f3fd86a..92a8025295f 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -15,7 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
-#include <pthread.h>
+#include <my_pthread.h>
#include "MgmtSrvr.hpp"
#include "MgmtErrorReporter.hpp"
@@ -45,7 +45,6 @@
#include <ndb_version.h>
#include <SocketServer.hpp>
-#include "NodeLogLevel.hpp"
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
@@ -62,71 +61,16 @@
#endif
extern int global_flag_send_heartbeat_now;
-
-static
-void
-CmdBackupCallback(const MgmtSrvr::BackupEvent & event)
-{
- char str[255];
-
- ndbout << endl;
-
- bool ok = false;
- switch(event.Event){
- case MgmtSrvr::BackupEvent::BackupStarted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d started", event.Started.BackupId);
- break;
- case MgmtSrvr::BackupEvent::BackupFailedToStart:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup failed to start (Error %d)",
- event.FailedToStart.ErrorCode);
- break;
- case MgmtSrvr::BackupEvent::BackupCompleted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d completed",
- event.Completed.BackupId);
- ndbout << str << endl;
-
- snprintf(str, sizeof(str),
- " StartGCP: %d StopGCP: %d",
- event.Completed.startGCP, event.Completed.stopGCP);
- ndbout << str << endl;
-
- snprintf(str, sizeof(str),
- " #Records: %d #LogRecords: %d",
- event.Completed.NoOfRecords, event.Completed.NoOfLogRecords);
- ndbout << str << endl;
-
- snprintf(str, sizeof(str),
- " Data: %d bytes Log: %d bytes",
- event.Completed.NoOfBytes, event.Completed.NoOfLogBytes);
- break;
- case MgmtSrvr::BackupEvent::BackupAborted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d has been aborted reason %d",
- event.Aborted.BackupId,
- event.Aborted.Reason);
- break;
- }
- if(!ok){
- snprintf(str, sizeof(str), "Unknown backup event: %d", event.Event);
- }
- ndbout << str << endl;
-}
-
+extern int g_no_nodeid_checks;
void *
MgmtSrvr::logLevelThread_C(void* m)
{
MgmtSrvr *mgm = (MgmtSrvr*)m;
-
+ my_thread_init();
mgm->logLevelThreadRun();
+ my_thread_end();
NdbThread_Exit(0);
/* NOTREACHED */
return 0;
@@ -136,9 +80,10 @@ void *
MgmtSrvr::signalRecvThread_C(void *m)
{
MgmtSrvr *mgm = (MgmtSrvr*)m;
-
+ my_thread_init();
mgm->signalRecvThreadRun();
+ my_thread_end();
NdbThread_Exit(0);
/* NOTREACHED */
return 0;
@@ -188,44 +133,65 @@ MgmtSrvr::signalRecvThreadRun()
EventLogger g_EventLogger;
+static NdbOut&
+operator<<(NdbOut& out, const LogLevel & ll)
+{
+ out << "[LogLevel: ";
+ for(size_t i = 0; i<LogLevel::LOGLEVEL_CATEGORIES; i++)
+ out << ll.getLogLevel((LogLevel::EventCategory)i) << " ";
+ out << "]";
+ return out;
+}
+
void
MgmtSrvr::logLevelThreadRun()
{
- NdbMutex* threadMutex = NdbMutex_Create();
-
while (!_isStopThread) {
- if (_startedNodeId != 0) {
- NdbMutex_Lock(threadMutex);
-
- // Local node
- NodeLogLevel* n = NULL;
- while ((n = _nodeLogLevelList->next()) != NULL) {
- if (n->getNodeId() == _startedNodeId) {
- setNodeLogLevel(_startedNodeId, n->getLogLevelOrd(), true);
- }
- }
- // Cluster log
- while ((n = _clusterLogLevelList->next()) != NULL) {
- if (n->getNodeId() == _startedNodeId) {
- setEventReportingLevel(_startedNodeId, n->getLogLevelOrd(), true);
- }
- }
- _startedNodeId = 0;
-
- NdbMutex_Unlock(threadMutex);
+ /**
+ * Handle started nodes
+ */
+ EventSubscribeReq req;
+ req = m_statisticsListner.m_clients[0].m_logLevel;
+ req.blockRef = _ownReference;
- } // if (_startedNodeId != 0) {
+ SetLogLevelOrd ord;
+
+ m_started_nodes.lock();
+ while(m_started_nodes.size() > 0){
+ Uint32 node = m_started_nodes[0];
+ m_started_nodes.erase(0, false);
+ m_started_nodes.unlock();
+ setEventReportingLevelImpl(node, req);
+
+ ord = m_nodeLogLevel[node];
+ setNodeLogLevelImpl(node, ord);
+
+ m_started_nodes.lock();
+ }
+ m_started_nodes.unlock();
+
+ m_log_level_requests.lock();
+ while(m_log_level_requests.size() > 0){
+ req = m_log_level_requests[0];
+ m_log_level_requests.erase(0, false);
+ m_log_level_requests.unlock();
+
+ LogLevel tmp;
+ tmp = req;
+
+ if(req.blockRef == 0){
+ req.blockRef = _ownReference;
+ setEventReportingLevelImpl(0, req);
+ } else {
+ ord = req;
+ setNodeLogLevelImpl(req.blockRef, ord);
+ }
+ m_log_level_requests.lock();
+ }
+ m_log_level_requests.unlock();
NdbSleep_MilliSleep(_logLevelThreadSleep);
- } // while (!_isStopThread)
-
- NdbMutex_Destroy(threadMutex);
-}
-
-void
-MgmtSrvr::setStatisticsListner(StatisticsListner* listner)
-{
- m_statisticsListner = listner;
+ }
}
void
@@ -272,7 +238,7 @@ class ErrorItem
{
public:
int _errorCode;
- const BaseString _errorText;
+ const char * _errorText;
};
bool
@@ -429,110 +395,37 @@ MgmtSrvr::getPort() const {
ndb_mgm_destroy_iterator(iter);
- /*****************
- * Set Stat Port *
- *****************/
-#if 0
- if (!mgmProps->get("PortNumberStats", &tmp)){
- ndbout << "Could not find PortNumberStats in the configuration file."
- << endl;
- return false;
- }
- glob.port_stats = tmp;
-#endif
-
-#if 0
- const char * host;
- if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
- ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
- ndbout << "Unable to verify own hostname" << endl;
- return false;
- }
-
- const char * hostname;
- {
- const Properties * p;
- char buf[255];
- snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
- if(!glob.cluster_config->get(buf, &p)){
- ndbout << "Failed to find computer " << host << " in config" << endl;
- ndbout << "Unable to verify own hostname" << endl;
- return false;
- }
- if(!p->get("HostName", &hostname)){
- ndbout << "Failed to find \"HostName\" for computer " << host
- << " in config" << endl;
- ndbout << "Unable to verify own hostname" << endl;
- return false;
- }
- if(NdbHost_GetHostName(buf) != 0){
- ndbout << "Unable to get own hostname" << endl;
- ndbout << "Unable to verify own hostname" << endl;
- return false;
- }
- }
-
- const char * ip_address;
- if(mgmProps->get("IpAddress", &ip_address)){
- glob.use_specific_ip = true;
- glob.interface_name = strdup(ip_address);
- return true;
- }
-
- glob.interface_name = strdup(hostname);
-#endif
-
return port;
}
-int
-MgmtSrvr::getStatPort() const {
-#if 0
- const Properties *mgmProps;
- if(!getConfig()->get("Node", _ownNodeId, &mgmProps))
- return -1;
-
- int tmp = -1;
- if(!mgmProps->get("PortNumberStats", (Uint32 *)&tmp))
- return -1;
-
- return tmp;
-#else
- return -1;
-#endif
-}
-
/* Constructor */
MgmtSrvr::MgmtSrvr(NodeId nodeId,
const BaseString &configFilename,
- const BaseString &ndb_config_filename,
+ LocalConfig &local_config,
Config * config):
_blockNumber(1), // Hard coded block number since it makes it easy to send
// signals to other management servers.
_ownReference(0),
+ m_allocated_resources(*this),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
- theConfCount(0),
- m_allocated_resources(*this) {
-
+ m_statisticsListner(this),
+ m_local_config(local_config)
+{
+
DBUG_ENTER("MgmtSrvr::MgmtSrvr");
_config = NULL;
- _isStatPortActive = false;
- _isClusterLogStatActive = false;
_isStopThread = false;
_logLevelThread = NULL;
_logLevelThreadSleep = 500;
m_signalRecvThread = NULL;
- _startedNodeId = 0;
theFacade = 0;
m_newConfig = NULL;
m_configFilename = configFilename;
- setCallback(CmdBackupCallback);
- m_localNdbConfigFilename = ndb_config_filename;
m_nextConfigGenerationNumber = 0;
@@ -583,33 +476,64 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
ndb_mgm_destroy_iterator(iter);
}
- m_statisticsListner = NULL;
-
- _nodeLogLevelList = new NodeLogLevelList();
- _clusterLogLevelList = new NodeLogLevelList();
-
_props = NULL;
-
_ownNodeId= 0;
NodeId tmp= nodeId;
BaseString error_string;
- if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, 0, 0, error_string)){
+#if 0
+ char my_hostname[256];
+ struct sockaddr_in tmp_addr;
+ SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr);
+ if (!g_no_nodeid_checks) {
+ if (gethostname(my_hostname, sizeof(my_hostname))) {
+ ndbout << "error: gethostname() - " << strerror(errno) << endl;
+ exit(-1);
+ }
+ if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) {
+ ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - "
+ << strerror(errno) << endl;
+ exit(-1);
+ }
+ }
+ if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
+ (struct sockaddr *)&tmp_addr,
+ &addrlen, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
+#else
+ if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
+ 0, 0, error_string)){
+ ndbout << "Unable to obtain requested nodeid: "
+ << error_string.c_str() << endl;
+ exit(-1);
+ }
+#endif
_ownNodeId = tmp;
{
DBUG_PRINT("info", ("verifyConfig"));
- ConfigRetriever cr(NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
+ ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) {
ndbout << cr.getErrorString() << endl;
exit(-1);
}
}
+ {
+ MgmStatService::StatListener se;
+ se.m_socket = -1;
+ for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){
+ se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7);
+ }
+ se.m_logLevel.setLogLevel(LogLevel::llError, 15);
+ se.m_logLevel.setLogLevel(LogLevel::llBackup, 15);
+ m_statisticsListner.m_clients.push_back(se);
+ m_statisticsListner.m_logLevel = se.m_logLevel;
+ }
+
DBUG_VOID_RETURN;
}
@@ -671,8 +595,6 @@ MgmtSrvr::start(BaseString &error_string)
// Set the initial confirmation count for subscribe requests confirm
// from NDB nodes in the cluster.
//
- theConfCount = getNodeCount(NDB_MGM_NODE_TYPE_NDB);
-
// Loglevel thread
_logLevelThread = NdbThread_Create(logLevelThread_C,
(void**)this,
@@ -713,9 +635,6 @@ MgmtSrvr::~MgmtSrvr()
if(_config != NULL)
delete _config;
- delete _nodeLogLevelList;
- delete _clusterLogLevelList;
-
// End set log level thread
void* res = 0;
_isStopThread = true;
@@ -736,6 +655,9 @@ MgmtSrvr::~MgmtSrvr()
int MgmtSrvr::okToSendTo(NodeId processId, bool unCond)
{
+ if(processId == 0)
+ return 0;
+
if (getNodeType(processId) != NDB_MGM_NODE_TYPE_NDB)
return WRONG_PROCESS_TYPE;
@@ -1020,36 +942,38 @@ int
MgmtSrvr::versionNode(int processId, bool abort,
VersionCallback callback, void * anyData)
{
+ int version;
+
if(m_versionRec.inUse)
return OPERATION_IN_PROGRESS;
m_versionRec.callback = callback;
m_versionRec.inUse = true ;
- ClusterMgr::Node node;
- int version;
- if (getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) {
- if(m_versionRec.callback != 0)
- m_versionRec.callback(processId, NDB_VERSION, this,0);
- }
- if (getNodeType(processId) == NDB_MGM_NODE_TYPE_NDB) {
- node = theFacade->theClusterMgr->getNodeInfo(processId);
- version = node.m_info.m_version;
- if(theFacade->theClusterMgr->getNodeInfo(processId).connected)
- if(m_versionRec.callback != 0)
- m_versionRec.callback(processId, version, this,0);
- else
- if(m_versionRec.callback != 0)
- m_versionRec.callback(processId, 0, this,0);
-
+ if (getOwnNodeId() == processId)
+ {
+ version= NDB_VERSION;
}
-
- if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API) {
+ else if (getNodeType(processId) == NDB_MGM_NODE_TYPE_NDB)
+ {
+ ClusterMgr::Node node= theFacade->theClusterMgr->getNodeInfo(processId);
+ if(node.connected)
+ version= node.m_info.m_version;
+ else
+ version= 0;
+ }
+ else if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API ||
+ getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM)
+ {
return sendVersionReq(processId);
}
+ if(m_versionRec.callback != 0)
+ m_versionRec.callback(processId, version, this,0);
m_versionRec.inUse = false ;
- return 0;
+ m_versionRec.version[processId]= version;
+
+ return 0;
}
int
@@ -1460,17 +1384,14 @@ MgmtSrvr::status(int processId,
Uint32 * nodegroup,
Uint32 * connectCount)
{
- if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API) {
+ if (getNodeType(processId) == NDB_MGM_NODE_TYPE_API ||
+ getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) {
if(versionNode(processId, false,0,0) ==0)
* version = m_versionRec.version[processId];
else
* version = 0;
}
- if (getNodeType(processId) == NDB_MGM_NODE_TYPE_MGM) {
- * version = NDB_VERSION;
- }
-
const ClusterMgr::Node node =
theFacade->theClusterMgr->getNodeInfo(processId);
@@ -1540,175 +1461,72 @@ MgmtSrvr::status(int processId,
return -1;
}
-
-
-//****************************************************************************
-//****************************************************************************
-int
-MgmtSrvr::startStatisticEventReporting(int level)
-{
- SetLogLevelOrd ll;
- NodeId nodeId = 0;
-
- ll.clear();
- ll.setLogLevel(LogLevel::llStatistic, level);
-
- if (level > 0) {
- _isStatPortActive = true;
- } else {
- _isStatPortActive = false;
-
- if (_isClusterLogStatActive) {
- return 0;
- }
- }
-
- while (getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) {
- setEventReportingLevelImpl(nodeId, ll);
- }
-
- return 0;
-}
-
-int
-MgmtSrvr::setEventReportingLevel(int processId, const SetLogLevelOrd & ll,
- bool isResend)
-{
- for (Uint32 i = 0; i < ll.noOfEntries; i++) {
- if (ll.theCategories[i] == LogLevel::llStatistic) {
- if (ll.theLevels[i] > 0) {
- _isClusterLogStatActive = true;
- break;
- } else {
- _isClusterLogStatActive = false;
-
- if (_isStatPortActive) {
- return 0;
- }
- break;
- }
- } // if (ll.theCategories
- } // for (int i = 0
-
- return setEventReportingLevelImpl(processId, ll, isResend);
-}
int
MgmtSrvr::setEventReportingLevelImpl(int processId,
- const SetLogLevelOrd & ll,
- bool isResend)
+ const EventSubscribeReq& ll)
{
- Uint32 i;
- for(i = 0; i<ll.noOfEntries; i++){
- // Save log level for the cluster log
- if (!isResend) {
- NodeLogLevel* n = NULL;
- bool found = false;
- while ((n = _clusterLogLevelList->next()) != NULL) {
- if (n->getNodeId() == processId &&
- n->getCategory() == ll.theCategories[i]) {
-
- n->setLevel(ll.theLevels[i]);
- found = true;
- }
- }
- if (!found) {
- _clusterLogLevelList->add(new NodeLogLevel(processId, ll));
- }
- }
- }
-
+
int result = okToSendTo(processId, true);
if (result != 0) {
return result;
}
- NdbApiSignal* signal = getSignal();
- if (signal == NULL) {
- return COULD_NOT_ALLOCATE_MEMORY;
- }
+ NdbApiSignal signal(_ownReference);
EventSubscribeReq * dst =
- CAST_PTR(EventSubscribeReq, signal->getDataPtrSend());
- for(i = 0; i<ll.noOfEntries; i++){
- dst->theCategories[i] = ll.theCategories[i];
- dst->theLevels[i] = ll.theLevels[i];
- }
-
- dst->noOfEntries = ll.noOfEntries;
- dst->blockRef = _ownReference;
+ CAST_PTR(EventSubscribeReq, signal.getDataPtrSend());
- signal->set(TestOrd::TraceAPI, CMVMI, GSN_EVENT_SUBSCRIBE_REQ,
- EventSubscribeReq::SignalLength);
+ * dst = ll;
+
+ signal.set(TestOrd::TraceAPI, CMVMI, GSN_EVENT_SUBSCRIBE_REQ,
+ EventSubscribeReq::SignalLength);
+
+ theFacade->lock_mutex();
+ send(&signal, processId, NODE_TYPE_DB);
+ theFacade->unlock_mutex();
- result = sendSignal(processId, WAIT_SUBSCRIBE_CONF, signal, true);
- if (result == -1) {
- return SEND_OR_RECEIVE_FAILED;
- }
- else {
- // Increment the conf counter
- theConfCount++;
- }
-
return 0;
}
//****************************************************************************
//****************************************************************************
int
-MgmtSrvr::setNodeLogLevel(int processId, const SetLogLevelOrd & ll,
- bool isResend)
+MgmtSrvr::setNodeLogLevelImpl(int processId, const SetLogLevelOrd & ll)
{
- Uint32 i;
- for(i = 0; i<ll.noOfEntries; i++){
- // Save log level for the cluster log
- if (!isResend) {
- NodeLogLevel* n = NULL;
- bool found = false;
- while ((n = _clusterLogLevelList->next()) != NULL) {
- if (n->getNodeId() == processId &&
- n->getCategory() == ll.theCategories[i]) {
-
- n->setLevel(ll.theLevels[i]);
- found = true;
- }
- }
- if (!found) {
- _clusterLogLevelList->add(new NodeLogLevel(processId, ll));
- }
- }
- }
-
int result = okToSendTo(processId, true);
if (result != 0) {
return result;
}
- NdbApiSignal* signal = getSignal();
- if (signal == NULL) {
- return COULD_NOT_ALLOCATE_MEMORY;
- }
-
- SetLogLevelOrd * dst = CAST_PTR(SetLogLevelOrd, signal->getDataPtrSend());
-
- for(i = 0; i<ll.noOfEntries; i++){
- dst->theCategories[i] = ll.theCategories[i];
- dst->theLevels[i] = ll.theLevels[i];
- }
+ NdbApiSignal signal(_ownReference);
- dst->noOfEntries = ll.noOfEntries;
+ SetLogLevelOrd * dst = CAST_PTR(SetLogLevelOrd, signal.getDataPtrSend());
- signal->set(TestOrd::TraceAPI, CMVMI, GSN_SET_LOGLEVELORD,
- SetLogLevelOrd::SignalLength);
-
- result = sendSignal(processId, NO_WAIT, signal, true);
- if (result == -1) {
- return SEND_OR_RECEIVE_FAILED;
- }
+ * dst = ll;
+
+ signal.set(TestOrd::TraceAPI, CMVMI, GSN_SET_LOGLEVELORD,
+ SetLogLevelOrd::SignalLength);
+
+ theFacade->lock_mutex();
+ theFacade->sendSignalUnCond(&signal, processId);
+ theFacade->unlock_mutex();
return 0;
}
+int
+MgmtSrvr::send(NdbApiSignal* signal, Uint32 node, Uint32 node_type){
+ Uint32 max = (node == 0) ? MAX_NODES : node + 1;
+
+ for(; node < max; node++){
+ while(nodeTypes[node] != (int)node_type && node < max) node++;
+ if(nodeTypes[node] != (int)node_type)
+ break;
+ theFacade->sendSignalUnCond(signal, node);
+ }
+ return 0;
+}
//****************************************************************************
//****************************************************************************
@@ -2003,7 +1821,7 @@ const char* MgmtSrvr::getErrorText(int errorCode)
for (int i = 0; i < noOfErrorCodes; ++i) {
if (errorCode == errorTable[i]._errorCode) {
- return errorTable[i]._errorText.c_str();
+ return errorTable[i]._errorText;
}
}
@@ -2011,21 +1829,6 @@ const char* MgmtSrvr::getErrorText(int errorCode)
return text;
}
-/*****************************************************************************
- * Handle reception of various signals
- *****************************************************************************/
-
-int
-MgmtSrvr::handleSTATISTICS_CONF(NdbApiSignal* signal)
-{
- //ndbout << "MgmtSrvr::handleSTATISTICS_CONF" << endl;
-
- int x = signal->readData(1);
- //ndbout << "MgmtSrvr::handleSTATISTICS_CONF, x: " << x << endl;
- _statistics._test1 = x;
- return 0;
-}
-
void
MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
{
@@ -2049,51 +1852,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
}
break;
- case GSN_STATISTICS_CONF:
- if (theWaitState != WAIT_STATISTICS) {
- g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected "
- "signal received, gsn %d, theWaitState = %d",
- gsn, theWaitState);
-
- return;
- }
- returnCode = handleSTATISTICS_CONF(signal);
- if (returnCode != -1) {
- theWaitState = NO_WAIT;
- }
- break;
-
-
- case GSN_SET_VAR_CONF:
- if (theWaitState != WAIT_SET_VAR) {
- g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected "
- "signal received, gsn %d, theWaitState = %d",
- gsn, theWaitState);
- return;
- }
- theWaitState = NO_WAIT;
- _setVarReqResult = 0;
- break;
-
- case GSN_SET_VAR_REF:
- if (theWaitState != WAIT_SET_VAR) {
- g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected "
- "signal received, gsn %d, theWaitState = %d",
- gsn, theWaitState);
- return;
- }
- theWaitState = NO_WAIT;
- _setVarReqResult = -1;
- break;
-
case GSN_EVENT_SUBSCRIBE_CONF:
- theConfCount--; // OK, we've received a conf message
- if (theConfCount < 0) {
- g_EventLogger.warning("MgmtSrvr::handleReceivedSignal, unexpected "
- "signal received, gsn %d, theWaitState = %d",
- gsn, theWaitState);
- theConfCount = 0;
- }
break;
case GSN_EVENT_REP:
@@ -2173,7 +1932,6 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
event.Completed.NoOfLogBytes = rep->noOfLogBytes;
event.Completed.NoOfRecords = rep->noOfRecords;
event.Completed.NoOfLogRecords = rep->noOfLogRecords;
-
event.Completed.stopGCP = rep->stopGCP;
event.Completed.startGCP = rep->startGCP;
event.Nodes = rep->nodes;
@@ -2276,20 +2034,19 @@ void
MgmtSrvr::handleStatus(NodeId nodeId, bool alive)
{
if (alive) {
- _startedNodeId = nodeId; // Used by logLevelThreadRun()
+ m_started_nodes.push_back(nodeId);
Uint32 theData[25];
theData[0] = EventReport::Connected;
theData[1] = nodeId;
+ eventReport(_ownNodeId, theData);
} else {
handleStopReply(nodeId, 0);
- theConfCount++; // Increment the event subscr conf count because
-
+
Uint32 theData[25];
theData[0] = EventReport::Disconnected;
theData[1] = nodeId;
-
+
eventReport(_ownNodeId, theData);
- g_EventLogger.info("Lost connection to node %d", nodeId);
}
}
@@ -2337,32 +2094,42 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
SOCKET_SIZE_TYPE *client_addr_len,
BaseString &error_string)
{
+ DBUG_ENTER("MgmtSrvr::alloc_node_id");
+ DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d",
+ *nodeId, type, client_addr));
+ if (g_no_nodeid_checks) {
+ if (*nodeId == 0) {
+ error_string.appfmt("no-nodeid-ckecks set in manegment server.\n"
+ "node id must be set explicitly in connectstring");
+ DBUG_RETURN(false);
+ }
+ DBUG_RETURN(true);
+ }
Guard g(&f_node_id_mutex);
-#if 0
- ndbout << "MgmtSrvr::getFreeNodeId type=" << type
- << " *nodeid=" << *nodeId << endl;
-#endif
-
+ int no_mgm= 0;
NodeBitmask connected_nodes(m_reserved_nodes);
- if (theFacade && theFacade->theClusterMgr) {
- for(Uint32 i = 0; i < MAX_NODES; i++)
- if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) {
- const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
- if (node.connected)
- connected_nodes.bitOR(node.m_state.m_connected_nodes);
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ {
+ if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB &&
+ theFacade && theFacade->theClusterMgr) {
+ const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
+ if (node.connected) {
+ connected_nodes.bitOR(node.m_state.m_connected_nodes);
}
+ } else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
+ no_mgm++;
}
-
bool found_matching_id= false;
bool found_matching_type= false;
bool found_free_node= false;
- const char *config_hostname = 0;
+ unsigned id_found= 0;
+ const char *config_hostname= 0;
struct in_addr config_addr= {0};
int r_config_addr= -1;
unsigned type_c= 0;
- ndb_mgm_configuration_iterator iter(*(ndb_mgm_configuration *)_config->m_configValues,
- CFG_SECTION_NODE);
+ ndb_mgm_configuration_iterator
+ iter(*(ndb_mgm_configuration *)_config->m_configValues, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next()) {
unsigned tmp= 0;
if(iter.get(CFG_NODE_ID, &tmp)) abort();
@@ -2370,15 +2137,16 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
continue;
found_matching_id= true;
if(iter.get(CFG_TYPE_OF_SECTION, &type_c)) abort();
- if(type_c != type)
+ if(type_c != (unsigned)type)
continue;
found_matching_type= true;
if (connected_nodes.get(tmp))
continue;
found_free_node= true;
if(iter.get(CFG_NODE_HOST, &config_hostname)) abort();
-
- if (config_hostname && config_hostname[0] != 0 && client_addr) {
+ if (config_hostname && config_hostname[0] == 0)
+ config_hostname= 0;
+ else if (client_addr) {
// check hostname compatability
const void *tmp_in= &(((sockaddr_in*)client_addr)->sin_addr);
if((r_config_addr= Ndb_getInAddr(&config_addr, config_hostname)) != 0
@@ -2388,39 +2156,76 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
|| memcmp(&tmp_addr, tmp_in, sizeof(config_addr)) != 0) {
// not localhost
#if 0
- ndbout << "MgmtSrvr::getFreeNodeId compare failed for \"" << config_hostname
- << "\" id=" << tmp << endl;
+ ndbout << "MgmtSrvr::getFreeNodeId compare failed for \""
+ << config_hostname
+ << "\" id=" << tmp << endl;
#endif
continue;
}
// connecting through localhost
- // check if config_hostname match hostname
- char my_hostname[256];
- if (gethostname(my_hostname, sizeof(my_hostname)) != 0)
- continue;
- if(Ndb_getInAddr(&tmp_addr, my_hostname) != 0
- || memcmp(&tmp_addr, &config_addr, sizeof(config_addr)) != 0) {
- // no match
+ // check if config_hostname is local
+ if (!SocketServer::tryBind(0,config_hostname)) {
continue;
}
}
+ } else { // client_addr == 0
+ if (!SocketServer::tryBind(0,config_hostname)) {
+ continue;
+ }
}
- *nodeId= tmp;
- if (client_addr)
- m_connect_address[tmp]= ((struct sockaddr_in *)client_addr)->sin_addr;
- else
- Ndb_getInAddr(&(m_connect_address[tmp]), "localhost");
- m_reserved_nodes.set(tmp);
-#if 0
- ndbout << "MgmtSrvr::getFreeNodeId found type=" << type
- << " *nodeid=" << *nodeId << endl;
-#endif
- return true;
+ if (*nodeId != 0 ||
+ type != NDB_MGM_NODE_TYPE_MGM ||
+ no_mgm == 1) { // any match is ok
+ id_found= tmp;
+ break;
+ }
+ if (id_found) { // mgmt server may only have one match
+ error_string.appfmt("Ambiguous node id's %d and %d.\n"
+ "Suggest specifying node id in connectstring,\n"
+ "or specifying unique host names in config file.",
+ id_found, tmp);
+ DBUG_RETURN(false);
+ }
+ if (config_hostname == 0) {
+ error_string.appfmt("Ambiguity for node id %d.\n"
+ "Suggest specifying node id in connectstring,\n"
+ "or specifying unique host names in config file,\n"
+ "or specifying just one mgmt server in config file.",
+ tmp);
+ DBUG_RETURN(false);
+ }
+ id_found= tmp; // mgmt server matched, check for more matches
+ }
+
+ if (id_found)
+ {
+ *nodeId= id_found;
+ DBUG_PRINT("info", ("allocating node id %d",*nodeId));
+ {
+ int r= 0;
+ if (client_addr)
+ m_connect_address[id_found]=
+ ((struct sockaddr_in *)client_addr)->sin_addr;
+ else if (config_hostname)
+ r= Ndb_getInAddr(&(m_connect_address[id_found]), config_hostname);
+ else {
+ char name[256];
+ r= gethostname(name, sizeof(name));
+ if (r == 0) {
+ name[sizeof(name)-1]= 0;
+ r= Ndb_getInAddr(&(m_connect_address[id_found]), name);
+ }
+ }
+ if (r)
+ m_connect_address[id_found].s_addr= 0;
+ }
+ m_reserved_nodes.set(id_found);
+ DBUG_RETURN(true);
}
if (found_matching_type && !found_free_node) {
- // we have a temporary error which might be due to that we have got the latest
- // connect status from db-nodes. Force update.
+ // we have a temporary error which might be due to that
+ // we have got the latest connect status from db-nodes. Force update.
global_flag_send_heartbeat_now= 1;
}
@@ -2429,7 +2234,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
const char *alias, *str;
alias= ndb_mgm_get_node_type_alias_string(type, &str);
type_string.assfmt("%s(%s)", alias, str);
- alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c, &str);
+ alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c,
+ &str);
type_c_string.assfmt("%s(%s)", alias, str);
}
@@ -2438,11 +2244,14 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (found_matching_type)
if (found_free_node)
error_string.appfmt("Connection done from wrong host ip %s.",
- inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr));
+ inet_ntoa(((struct sockaddr_in *)
+ (client_addr))->sin_addr));
else
- error_string.appfmt("No free node id found for %s.", type_string.c_str());
+ error_string.appfmt("No free node id found for %s.",
+ type_string.c_str());
else
- error_string.appfmt("No %s node defined in config file.", type_string.c_str());
+ error_string.appfmt("No %s node defined in config file.",
+ type_string.c_str());
else
error_string.append("No nodes defined in config file.");
} else {
@@ -2451,19 +2260,23 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (found_free_node) {
// have to split these into two since inet_ntoa overwrites itself
error_string.appfmt("Connection with id %d done from wrong host ip %s,",
- *nodeId, inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr));
+ *nodeId, inet_ntoa(((struct sockaddr_in *)
+ (client_addr))->sin_addr));
error_string.appfmt(" expected %s(%s).", config_hostname,
- r_config_addr ? "lookup failed" : inet_ntoa(config_addr));
+ r_config_addr ?
+ "lookup failed" : inet_ntoa(config_addr));
} else
- error_string.appfmt("Id %d already allocated by another node.", *nodeId);
+ error_string.appfmt("Id %d already allocated by another node.",
+ *nodeId);
else
error_string.appfmt("Id %d configured as %s, connect attempted as %s.",
- *nodeId, type_c_string.c_str(), type_string.c_str());
+ *nodeId, type_c_string.c_str(),
+ type_string.c_str());
else
- error_string.appfmt("No node defined with id=%d in config file.", *nodeId);
+ error_string.appfmt("No node defined with id=%d in config file.",
+ *nodeId);
}
-
- return false;
+ DBUG_RETURN(false);
}
bool
@@ -2483,91 +2296,23 @@ MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const
return true;
}
+#include "Services.hpp"
+
void
MgmtSrvr::eventReport(NodeId nodeId, const Uint32 * theData)
{
const EventReport * const eventReport = (EventReport *)&theData[0];
-
+
EventReport::EventType type = eventReport->getEventType();
-
- if (type == EventReport::TransReportCounters ||
- type == EventReport::OperationReportCounters) {
-
- if (_isClusterLogStatActive) {
- g_EventLogger.log(type, theData, nodeId);
- }
-
- if (_isStatPortActive) {
- char theTime[128];
- struct tm* tm_now;
- time_t now;
- now = time((time_t*)NULL);
-#ifdef NDB_WIN32
- tm_now = localtime(&now);
-#else
- tm_now = gmtime(&now);
-#endif
-
- snprintf(theTime, sizeof(theTime),
- STATISTIC_DATE,
- tm_now->tm_year + 1900,
- tm_now->tm_mon,
- tm_now->tm_mday,
- tm_now->tm_hour,
- tm_now->tm_min,
- tm_now->tm_sec);
-
- char str[255];
-
- if (type == EventReport::TransReportCounters) {
- snprintf(str, sizeof(str),
- STATISTIC_LINE,
- theTime,
- (int)now,
- nodeId,
- theData[1],
- theData[2],
- theData[3],
- // theData[4], simple reads
- theData[5],
- theData[6],
- theData[7],
- theData[8]);
- } else if (type == EventReport::OperationReportCounters) {
- snprintf(str, sizeof(str),
- OP_STATISTIC_LINE,
- theTime,
- (int)now,
- nodeId,
- theData[1]);
- }
-
- if(m_statisticsListner != 0){
- m_statisticsListner->println_statistics(str);
- }
- }
-
- return;
-
- } // if (type ==
-
// Log event
- g_EventLogger.log(type, theData, nodeId);
-
+ g_EventLogger.log(type, theData, nodeId,
+ &m_statisticsListner.m_clients[0].m_logLevel);
+ m_statisticsListner.log(type, theData, nodeId);
}
/***************************************************************************
* Backup
***************************************************************************/
-
-MgmtSrvr::BackupCallback
-MgmtSrvr::setCallback(BackupCallback aCall)
-{
- BackupCallback ret = m_backupCallback;
- m_backupCallback = aCall;
- return ret;
-}
-
int
MgmtSrvr::startBackup(Uint32& backupId, bool waitCompleted)
{
@@ -2674,102 +2419,18 @@ MgmtSrvr::abortBackup(Uint32 backupId)
void
MgmtSrvr::backupCallback(BackupEvent & event)
{
- char str[255];
-
- bool ok = false;
+ m_lastBackupEvent = event;
switch(event.Event){
- case BackupEvent::BackupStarted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d started", event.Started.BackupId);
- break;
case BackupEvent::BackupFailedToStart:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup failed to start (Backup error %d)",
- event.FailedToStart.ErrorCode);
- break;
- case BackupEvent::BackupCompleted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d completed",
- event.Completed.BackupId);
- g_EventLogger.info(str);
-
- snprintf(str, sizeof(str),
- " StartGCP: %d StopGCP: %d",
- event.Completed.startGCP, event.Completed.stopGCP);
- g_EventLogger.info(str);
-
- snprintf(str, sizeof(str),
- " #Records: %d #LogRecords: %d",
- event.Completed.NoOfRecords, event.Completed.NoOfLogRecords);
- g_EventLogger.info(str);
-
- snprintf(str, sizeof(str),
- " Data: %d bytes Log: %d bytes",
- event.Completed.NoOfBytes, event.Completed.NoOfLogBytes);
- break;
case BackupEvent::BackupAborted:
- ok = true;
- snprintf(str, sizeof(str),
- "Backup %d has been aborted reason %d",
- event.Aborted.BackupId,
- event.Aborted.Reason);
- break;
- }
- if(!ok){
- snprintf(str, sizeof(str),
- "Unknown backup event: %d",
- event.Event);
-
- }
- g_EventLogger.info(str);
-
- switch (theWaitState){
- case WAIT_BACKUP_STARTED:
- switch(event.Event){
- case BackupEvent::BackupStarted:
- case BackupEvent::BackupFailedToStart:
- m_lastBackupEvent = event;
- theWaitState = NO_WAIT;
- break;
- default:
- snprintf(str, sizeof(str),
- "Received event %d in unexpected state WAIT_BACKUP_STARTED",
- event.Event);
- g_EventLogger.info(str);
- return;
- }
-
+ case BackupEvent::BackupCompleted:
+ theWaitState = NO_WAIT;
break;
- case WAIT_BACKUP_COMPLETED:
- switch(event.Event){
- case BackupEvent::BackupCompleted:
- case BackupEvent::BackupAborted:
- case BackupEvent::BackupFailedToStart:
- m_lastBackupEvent = event;
+ case BackupEvent::BackupStarted:
+ if(theWaitState == WAIT_BACKUP_STARTED)
theWaitState = NO_WAIT;
- break;
- default:
- snprintf(str, sizeof(str),
- "Received event %d in unexpected state WAIT_BACKUP_COMPLETED",
- event.Event);
- g_EventLogger.info(str);
- return;
- }
- break;
- default:
- snprintf(str, sizeof(str), "Received event %d in unexpected state = %d",
- event.Event, theWaitState);
- g_EventLogger.info(str);
- return;
-
- }
-
- if(m_backupCallback != 0){
- (* m_backupCallback)(event);
}
+ return;
}
@@ -2957,15 +2618,15 @@ MgmtSrvr::setDbParameter(int node, int param, const char * value,
switch(p_type){
case 0:
res = i2.set(param, val_32);
- ndbout_c("Updateing node %d param: %d to %d", node, param, val_32);
+ ndbout_c("Updating node %d param: %d to %d", node, param, val_32);
break;
case 1:
res = i2.set(param, val_64);
- ndbout_c("Updateing node %d param: %d to %Ld", node, param, val_32);
+ ndbout_c("Updating node %d param: %d to %Ld", node, param, val_32);
break;
case 2:
res = i2.set(param, val_char);
- ndbout_c("Updateing node %d param: %d to %s", node, param, val_char);
+ ndbout_c("Updating node %d param: %d to %s", node, param, val_char);
break;
default:
abort();
@@ -2981,3 +2642,7 @@ template class Vector<SigMatch>;
#if __SUNPRO_CC != 0x560
template bool SignalQueue::waitFor<SigMatch>(Vector<SigMatch>&, SigMatch*&, NdbApiSignal*&, unsigned);
#endif
+
+template class MutexVector<unsigned short>;
+template class MutexVector<MgmStatService::StatListener>;
+template class MutexVector<EventSubscribeReq>;