diff options
author | unknown <stewart@mysql.com> | 2004-12-23 16:23:32 +1100 |
---|---|---|
committer | unknown <stewart@mysql.com> | 2004-12-23 16:23:32 +1100 |
commit | 233a33da3a8d7457d5686139dac04e77a258e247 (patch) | |
tree | a1b18447178b08d8d59d989f2e82b855894d3233 /ndb | |
parent | 168e57197657ccd4babc4dfcc81e8f0216db0342 (diff) | |
download | mariadb-git-233a33da3a8d7457d5686139dac04e77a258e247.tar.gz |
Impl 2 of WL2278 - Dynamic port allocation of cluster nodes.
In "client connect thread", let the client read the port to connect to using
ndb_mgm_get_connection_int_parameter.
The request for the port is resent on every connect attempt.
ndb/include/mgmapi/mgmapi_debug.h:
Make ndb_mgm_get_connection_int_parameter return a Uint32 value - this is what Properties etc use, so we'll be consistent.
ndb/include/transporter/TransporterRegistry.hpp:
Add NdbMgmHandle to constructor. This is used to get the port number
to connect to from mgmd. Defaults to NULL, although things will go badly
if you don't change this (by calling the new set_mgm_handle method) pretty
quickly.
Add set_mgm_handle(NdbMgmHandle) method.
- sets the MgmHandle to use when requesting from mgmd what port to connect to a node on.
ndb/src/common/transporter/Transporter.hpp:
Make remote port not a const.
Add method to set remote port - set_r_port(unsigned int)
Make getLocalNodeId return localNodeId, not remoteNodeId.
ndb/src/common/transporter/TransporterRegistry.cpp:
TransporterRegistry::TransporterRegistry()
- accept NdbMgmHandle parameter
- set m_mgm_handle to this
TransporterRegistry::start_clients_thread()
- If we're connecting to a node, and the server_port (from the config) is <=0,
we request the port number to connect to from mgmd.
(note: in testing, the <=0 check was commented out so the code was run.
There is no harm in always running it, it's just an extra round-trip to mgmd
that we may not need).
ndb/src/kernel/main.cpp:
Set the mgm_handle for globalTransporterRegistry soon after we have set up theConfig (which sets up the mgmHandle).
ndb/src/mgmapi/mgmapi.cpp:
- Remove dead #else on #if 1
- Print an error message and warning if the parser returns NULL.
this will no longer silently fail, it will give output with
information to help the programmer find out where things went wrong.
In normal operation, this codepath should never be hit.
- fix handlers for 'get|set connection parameter' calls.
ndb/src/mgmsrv/MgmtSrvr.cpp:
- Create TransporterFacade with the mgmHandle.
- Don't worry about the order of node1 and node2 in getConnectionDbParameter
- use a proper DBUG_RETURN in getConnectionParameter
ndb/src/mgmsrv/Services.cpp:
- fix reply to 'get connection parameter'
- optimise reply size.
ndb/src/ndbapi/TransporterFacade.cpp:
- create TransporterRegistry with m_mgm_handle
- set m_mgm_handle in constructor
ndb/src/ndbapi/TransporterFacade.hpp:
Introduce m_mgm_handle member.
ndb/src/ndbapi/ndb_cluster_connection.cpp:
create TransporterFacade (with mgmHandle) after the ConfigRetriever has been created
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/include/mgmapi/mgmapi_debug.h | 2 | ||||
-rw-r--r-- | ndb/include/transporter/TransporterRegistry.hpp | 11 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.hpp | 9 | ||||
-rw-r--r-- | ndb/src/common/transporter/TransporterRegistry.cpp | 29 | ||||
-rw-r--r-- | ndb/src/kernel/main.cpp | 4 | ||||
-rw-r--r-- | ndb/src/mgmapi/mgmapi.cpp | 32 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 8 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 5 | ||||
-rw-r--r-- | ndb/src/ndbapi/TransporterFacade.cpp | 7 | ||||
-rw-r--r-- | ndb/src/ndbapi/TransporterFacade.hpp | 6 | ||||
-rw-r--r-- | ndb/src/ndbapi/ndb_cluster_connection.cpp | 5 |
11 files changed, 84 insertions, 34 deletions
diff --git a/ndb/include/mgmapi/mgmapi_debug.h b/ndb/include/mgmapi/mgmapi_debug.h index f608d27ca54..cbf9878f163 100644 --- a/ndb/include/mgmapi/mgmapi_debug.h +++ b/ndb/include/mgmapi/mgmapi_debug.h @@ -165,7 +165,7 @@ extern "C" { int node1, int node2, int param, - unsigned *value, + Uint32 *value, struct ndb_mgm_reply* reply); #ifdef __cplusplus diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp index 0e2c1f1f5be..b7f8039abcc 100644 --- a/ndb/include/transporter/TransporterRegistry.hpp +++ b/ndb/include/transporter/TransporterRegistry.hpp @@ -33,6 +33,8 @@ #include <NdbTCP.h> +#include <mgmapi/mgmapi.h> + // A transporter is always in an IOState. // NoHalt is used initially and as long as it is no restrictions on // sending or receiving. @@ -94,10 +96,13 @@ public: /** * Constructor */ - TransporterRegistry(void * callback = 0 , + TransporterRegistry(NdbMgmHandle mgm_handle=NULL, + void * callback = 0 , unsigned maxTransporters = MAX_NTRANSPORTERS, unsigned sizeOfLongSignalMemory = 100); - + + void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; }; + bool init(NodeId localNodeId); /** @@ -236,6 +241,8 @@ protected: private: void * callbackObj; + NdbMgmHandle m_mgm_handle; + struct NdbThread *m_start_clients_thread; bool m_run_start_clients_thread; diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index 9a39f8788bc..a9a5290eae0 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -69,6 +69,11 @@ public: */ NodeId getLocalNodeId() const; + /** + * Set r_port to connect to + */ + void set_r_port(unsigned int port) { m_r_port = port; }; + protected: Transporter(TransporterRegistry &, const char *lHostName, @@ -101,7 +106,7 @@ protected: struct in_addr remoteHostAddress; struct in_addr localHostAddress; - const unsigned int m_r_port; + unsigned int m_r_port; const NodeId remoteNodeId; const NodeId localNodeId; @@ -149,7 +154,7 @@ Transporter::getRemoteNodeId() const { inline NodeId Transporter::getLocalNodeId() const { - return remoteNodeId; + return localNodeId; } inline diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index 7f8f9d258be..2b174b9210e 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -47,6 +47,8 @@ #include <InputStream.hpp> #include <OutputStream.hpp> +#include <mgmapi/mgmapi_debug.h> + int g_shm_pid = 0; SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) @@ -105,13 +107,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) DBUG_RETURN(0); } -TransporterRegistry::TransporterRegistry(void * callback, +TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle, + void * callback, unsigned _maxTransporters, unsigned sizeOfLongSignalMemory) { nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; + m_mgm_handle = mgm_handle; callbackObj=callback; @@ -1136,8 +1140,27 @@ TransporterRegistry::start_clients_thread() const NodeId nodeId = t->getRemoteNodeId(); switch(performStates[nodeId]){ case CONNECTING: - if(!t->isConnected() && !t->isServer) - t->connect_client(); + if(!t->isConnected() && !t->isServer) { + if(server_port <= 0) { // Port is dynamic + Uint32 server_port=0; + struct ndb_mgm_reply mgm_reply; + int res; + res=ndb_mgm_get_connection_int_parameter(m_mgm_handle, + t->getRemoteNodeId(), + t->getLocalNodeId(), + CFG_CONNECTION_SERVER_PORT, + &server_port, + &mgm_reply); + DBUG_PRINT("info",("Got dynamic port %u for %d -> %d (ret: %d)", + server_port,t->getRemoteNodeId(), + t->getLocalNodeId())); + if(res>=0) + t->set_r_port(server_port); + else + ndbout_c("Failed to get dynamic port to connect to."); + } + t->connect_client(); + } break; case DISCONNECTING: if(t->isConnected()) diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index b9b4ff2c115..da0c5df7a33 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -91,6 +91,10 @@ int main(int argc, char** argv) return 1; } } + + globalTransporterRegistry.set_mgm_handle(theConfig + ->get_config_retriever() + ->get_mgmHandle()); #ifndef NDB_WIN32 for(pid_t child = fork(); child != 0; child = fork()){ diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 5721d359ecd..01eaf4badc8 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -307,14 +307,17 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ParserDummy session(handle->socket); Parser_t parser(command_reply, in, true, true, true); -#if 1 const Properties* p = parser.parse(ctx, session); if (p == NULL){ /** * Print some info about why the parser returns NULL */ - //ndbout << " status=" << ctx.m_status << ", curr=" - //<< ctx.m_currentToken << endl; + ndbout << "Error in mgm protocol parser. " + << "cmd: '" << cmd + << "' status=" << ctx.m_status + << ", curr=" << ctx.m_currentToken + << endl; + DBUG_PRINT("info",("parser.parse returned NULL")); } #ifdef MGMAPI_LOG else { @@ -325,9 +328,6 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, } #endif return p; -#else - return parser.parse(ctx, session); -#endif } /** @@ -1998,7 +1998,8 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, const ParserRow<ParserDummy> reply[]= { MGM_CMD("set connection parameter reply", NULL, ""), - MGM_ARG("result", String, Mandatory, "Error message"), + MGM_ARG("message", String, Mandatory, "Error Message"), + MGM_ARG("result", String, Mandatory, "Status Result"), MGM_END() }; @@ -2026,7 +2027,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, int node1, int node2, int param, - unsigned *value, + Uint32 *value, struct ndb_mgm_reply* mgmreply){ DBUG_ENTER("ndb_mgm_get_connection_int_parameter"); CHECK_HANDLE(handle, -1); @@ -2036,17 +2037,17 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, args.put("node1", node1); args.put("node2", node2); args.put("param", param); - + const ParserRow<ParserDummy> reply[]= { MGM_CMD("get connection parameter reply", NULL, ""), - MGM_ARG("result", String, Mandatory, "Error message"), MGM_ARG("value", Int, Mandatory, "Current Value"), + MGM_ARG("result", String, Mandatory, "Result"), MGM_END() }; const Properties *prop; - prop= ndb_mgm_call(handle, reply, "get connection parameter", &args); - CHECK_REPLY(prop, -1); + prop = ndb_mgm_call(handle, reply, "get connection parameter", &args); + CHECK_REPLY(prop, -2); int res= -1; do { @@ -2058,10 +2059,13 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, res= 0; } while(0); - prop->get("value",value); + if(!prop->get("value",value)){ + ndbout_c("Unable to get value"); + res = -3; + } delete prop; - return res; + DBUG_RETURN(res); } diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 510e662b78d..10691293700 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -584,7 +584,8 @@ MgmtSrvr::start(BaseString &error_string) return false; } } - theFacade= TransporterFacade::theFacadeInstance= new TransporterFacade(); + theFacade= TransporterFacade::theFacadeInstance + = new TransporterFacade(m_config_retriever->get_mgmHandle()); if(theFacade == 0) { DEBUG("MgmtSrvr.cpp: theFacade is NULL."); @@ -2806,7 +2807,8 @@ MgmtSrvr::getConnectionDbParameter(int node1, Uint32 n1,n2; iter.get(CFG_CONNECTION_NODE_1, &n1); iter.get(CFG_CONNECTION_NODE_2, &n2); - if(n1 == (unsigned)node1 && n2 == (unsigned)node2) + if((n1 == (unsigned)node1 && n2 == (unsigned)node2) + || (n1 == (unsigned)node2 && n2 == (unsigned)node1)) break; } if(!iter.valid()) { @@ -2820,7 +2822,7 @@ MgmtSrvr::getConnectionDbParameter(int node1, } msg.assfmt("%u",*value); - return 1; + DBUG_RETURN(1); } template class Vector<SigMatch>; diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 6fda93b67a1..0f0d1582d1a 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -1399,10 +1399,9 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx, &value, result); - m_output->println("set connection parameter reply"); - m_output->println("message: %s", result.c_str()); + m_output->println("get connection parameter reply"); m_output->println("value: %u", value); - m_output->println("result: %s", (ret>0)?"Ok":"Failed"); + m_output->println("result: %s", (ret>0)?"Ok":result.c_str()); m_output->println(""); } diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index f9dde92f17d..6287e0ff43e 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -472,12 +472,13 @@ void TransporterFacade::threadMainReceive(void) theTransporterRegistry->stopReceiving(); } -TransporterFacade::TransporterFacade() : +TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL), - m_fragmented_signal_id(0) + m_fragmented_signal_id(0), + m_mgm_handle(mgm_handle) { theOwnId = 0; @@ -501,7 +502,7 @@ bool TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) { theOwnId = nodeId; - theTransporterRegistry = new TransporterRegistry(this); + theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this); const int res = IPCConfig::configureTransporters(nodeId, * props, diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 5c1e1cb839c..bbc1c77a26a 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -24,6 +24,7 @@ #include <NdbMutex.h> #include "DictCache.hpp" #include <BlockNumbers.h> +#include <mgmapi.h> class ClusterMgr; class ArbitMgr; @@ -46,7 +47,7 @@ extern "C" { class TransporterFacade { public: - TransporterFacade(); + TransporterFacade(NdbMgmHandle mgm_handle); virtual ~TransporterFacade(); bool init(Uint32, const ndb_mgm_configuration *); @@ -131,7 +132,8 @@ private: bool isConnected(NodeId aNodeId); void doStop(); - + + NdbMgmHandle m_mgm_handle; TransporterRegistry* theTransporterRegistry; SocketServer m_socket_server; int sendPerformedLastInterval; diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp index 3b159838171..130287b6fee 100644 --- a/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -40,7 +40,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) { DBUG_ENTER("Ndb_cluster_connection"); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this)); - m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade(); m_config_retriever= 0; m_connect_thread= 0; @@ -58,6 +57,10 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) #endif m_config_retriever= new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API); + + m_facade= TransporterFacade::theFacadeInstance + = new TransporterFacade(m_config_retriever->get_mgmHandle()); + if (m_config_retriever->hasError()) { printf("Could not connect initialize handle to management server: %s", |