summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <stewart@mysql.com>2004-12-23 16:23:32 +1100
committerunknown <stewart@mysql.com>2004-12-23 16:23:32 +1100
commit233a33da3a8d7457d5686139dac04e77a258e247 (patch)
treea1b18447178b08d8d59d989f2e82b855894d3233 /ndb
parent168e57197657ccd4babc4dfcc81e8f0216db0342 (diff)
downloadmariadb-git-233a33da3a8d7457d5686139dac04e77a258e247.tar.gz
Impl 2 of WL2278 - Dynamic port allocation of cluster nodes.
In "client connect thread", let the client read the port to connect to using ndb_mgm_get_connection_int_parameter. The request for the port is resent on every connect attempt. ndb/include/mgmapi/mgmapi_debug.h: Make ndb_mgm_get_connection_int_parameter return a Uint32 value - this is what Properties etc use, so we'll be consistent. ndb/include/transporter/TransporterRegistry.hpp: Add NdbMgmHandle to constructor. This is used to get the port number to connect to from mgmd. Defaults to NULL, although things will go badly if you don't change this (by calling the new set_mgm_handle method) pretty quickly. Add set_mgm_handle(NdbMgmHandle) method. - sets the MgmHandle to use when requesting from mgmd what port to connect to a node on. ndb/src/common/transporter/Transporter.hpp: Make remote port not a const. Add method to set remote port - set_r_port(unsigned int) Make getLocalNodeId return localNodeId, not remoteNodeId. ndb/src/common/transporter/TransporterRegistry.cpp: TransporterRegistry::TransporterRegistry() - accept NdbMgmHandle parameter - set m_mgm_handle to this TransporterRegistry::start_clients_thread() - If we're connecting to a node, and the server_port (from the config) is <=0, we request the port number to connect to from mgmd. (note: in testing, the <=0 check was commented out so the code was run. There is no harm in always running it, it's just an extra round-trip to mgmd that we may not need). ndb/src/kernel/main.cpp: Set the mgm_handle for globalTransporterRegistry soon after we have set up theConfig (which sets up the mgmHandle). ndb/src/mgmapi/mgmapi.cpp: - Remove dead #else on #if 1 - Print an error message and warning if the parser returns NULL. this will no longer silently fail, it will give output with information to help the programmer find out where things went wrong. In normal operation, this codepath should never be hit. - fix handlers for 'get|set connection parameter' calls. ndb/src/mgmsrv/MgmtSrvr.cpp: - Create TransporterFacade with the mgmHandle. - Don't worry about the order of node1 and node2 in getConnectionDbParameter - use a proper DBUG_RETURN in getConnectionParameter ndb/src/mgmsrv/Services.cpp: - fix reply to 'get connection parameter' - optimise reply size. ndb/src/ndbapi/TransporterFacade.cpp: - create TransporterRegistry with m_mgm_handle - set m_mgm_handle in constructor ndb/src/ndbapi/TransporterFacade.hpp: Introduce m_mgm_handle member. ndb/src/ndbapi/ndb_cluster_connection.cpp: create TransporterFacade (with mgmHandle) after the ConfigRetriever has been created
Diffstat (limited to 'ndb')
-rw-r--r--ndb/include/mgmapi/mgmapi_debug.h2
-rw-r--r--ndb/include/transporter/TransporterRegistry.hpp11
-rw-r--r--ndb/src/common/transporter/Transporter.hpp9
-rw-r--r--ndb/src/common/transporter/TransporterRegistry.cpp29
-rw-r--r--ndb/src/kernel/main.cpp4
-rw-r--r--ndb/src/mgmapi/mgmapi.cpp32
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp8
-rw-r--r--ndb/src/mgmsrv/Services.cpp5
-rw-r--r--ndb/src/ndbapi/TransporterFacade.cpp7
-rw-r--r--ndb/src/ndbapi/TransporterFacade.hpp6
-rw-r--r--ndb/src/ndbapi/ndb_cluster_connection.cpp5
11 files changed, 84 insertions, 34 deletions
diff --git a/ndb/include/mgmapi/mgmapi_debug.h b/ndb/include/mgmapi/mgmapi_debug.h
index f608d27ca54..cbf9878f163 100644
--- a/ndb/include/mgmapi/mgmapi_debug.h
+++ b/ndb/include/mgmapi/mgmapi_debug.h
@@ -165,7 +165,7 @@ extern "C" {
int node1,
int node2,
int param,
- unsigned *value,
+ Uint32 *value,
struct ndb_mgm_reply* reply);
#ifdef __cplusplus
diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp
index 0e2c1f1f5be..b7f8039abcc 100644
--- a/ndb/include/transporter/TransporterRegistry.hpp
+++ b/ndb/include/transporter/TransporterRegistry.hpp
@@ -33,6 +33,8 @@
#include <NdbTCP.h>
+#include <mgmapi/mgmapi.h>
+
// A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on
// sending or receiving.
@@ -94,10 +96,13 @@ public:
/**
* Constructor
*/
- TransporterRegistry(void * callback = 0 ,
+ TransporterRegistry(NdbMgmHandle mgm_handle=NULL,
+ void * callback = 0 ,
unsigned maxTransporters = MAX_NTRANSPORTERS,
unsigned sizeOfLongSignalMemory = 100);
-
+
+ void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
+
bool init(NodeId localNodeId);
/**
@@ -236,6 +241,8 @@ protected:
private:
void * callbackObj;
+ NdbMgmHandle m_mgm_handle;
+
struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread;
diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp
index 9a39f8788bc..a9a5290eae0 100644
--- a/ndb/src/common/transporter/Transporter.hpp
+++ b/ndb/src/common/transporter/Transporter.hpp
@@ -69,6 +69,11 @@ public:
*/
NodeId getLocalNodeId() const;
+ /**
+ * Set r_port to connect to
+ */
+ void set_r_port(unsigned int port) { m_r_port = port; };
+
protected:
Transporter(TransporterRegistry &,
const char *lHostName,
@@ -101,7 +106,7 @@ protected:
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
- const unsigned int m_r_port;
+ unsigned int m_r_port;
const NodeId remoteNodeId;
const NodeId localNodeId;
@@ -149,7 +154,7 @@ Transporter::getRemoteNodeId() const {
inline
NodeId
Transporter::getLocalNodeId() const {
- return remoteNodeId;
+ return localNodeId;
}
inline
diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp
index 7f8f9d258be..2b174b9210e 100644
--- a/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -47,6 +47,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
+#include <mgmapi/mgmapi_debug.h>
+
int g_shm_pid = 0;
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
@@ -105,13 +107,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(0);
}
-TransporterRegistry::TransporterRegistry(void * callback,
+TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
+ void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
+ m_mgm_handle = mgm_handle;
callbackObj=callback;
@@ -1136,8 +1140,27 @@ TransporterRegistry::start_clients_thread()
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTING:
- if(!t->isConnected() && !t->isServer)
- t->connect_client();
+ if(!t->isConnected() && !t->isServer) {
+ if(server_port <= 0) { // Port is dynamic
+ Uint32 server_port=0;
+ struct ndb_mgm_reply mgm_reply;
+ int res;
+ res=ndb_mgm_get_connection_int_parameter(m_mgm_handle,
+ t->getRemoteNodeId(),
+ t->getLocalNodeId(),
+ CFG_CONNECTION_SERVER_PORT,
+ &server_port,
+ &mgm_reply);
+ DBUG_PRINT("info",("Got dynamic port %u for %d -> %d (ret: %d)",
+ server_port,t->getRemoteNodeId(),
+ t->getLocalNodeId()));
+ if(res>=0)
+ t->set_r_port(server_port);
+ else
+ ndbout_c("Failed to get dynamic port to connect to.");
+ }
+ t->connect_client();
+ }
break;
case DISCONNECTING:
if(t->isConnected())
diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp
index b9b4ff2c115..da0c5df7a33 100644
--- a/ndb/src/kernel/main.cpp
+++ b/ndb/src/kernel/main.cpp
@@ -91,6 +91,10 @@ int main(int argc, char** argv)
return 1;
}
}
+
+ globalTransporterRegistry.set_mgm_handle(theConfig
+ ->get_config_retriever()
+ ->get_mgmHandle());
#ifndef NDB_WIN32
for(pid_t child = fork(); child != 0; child = fork()){
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index 5721d359ecd..01eaf4badc8 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -307,14 +307,17 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
ParserDummy session(handle->socket);
Parser_t parser(command_reply, in, true, true, true);
-#if 1
const Properties* p = parser.parse(ctx, session);
if (p == NULL){
/**
* Print some info about why the parser returns NULL
*/
- //ndbout << " status=" << ctx.m_status << ", curr="
- //<< ctx.m_currentToken << endl;
+ ndbout << "Error in mgm protocol parser. "
+ << "cmd: '" << cmd
+ << "' status=" << ctx.m_status
+ << ", curr=" << ctx.m_currentToken
+ << endl;
+ DBUG_PRINT("info",("parser.parse returned NULL"));
}
#ifdef MGMAPI_LOG
else {
@@ -325,9 +328,6 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
}
#endif
return p;
-#else
- return parser.parse(ctx, session);
-#endif
}
/**
@@ -1998,7 +1998,8 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle,
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("set connection parameter reply", NULL, ""),
- MGM_ARG("result", String, Mandatory, "Error message"),
+ MGM_ARG("message", String, Mandatory, "Error Message"),
+ MGM_ARG("result", String, Mandatory, "Status Result"),
MGM_END()
};
@@ -2026,7 +2027,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
int node1,
int node2,
int param,
- unsigned *value,
+ Uint32 *value,
struct ndb_mgm_reply* mgmreply){
DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
CHECK_HANDLE(handle, -1);
@@ -2036,17 +2037,17 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
args.put("node1", node1);
args.put("node2", node2);
args.put("param", param);
-
+
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get connection parameter reply", NULL, ""),
- MGM_ARG("result", String, Mandatory, "Error message"),
MGM_ARG("value", Int, Mandatory, "Current Value"),
+ MGM_ARG("result", String, Mandatory, "Result"),
MGM_END()
};
const Properties *prop;
- prop= ndb_mgm_call(handle, reply, "get connection parameter", &args);
- CHECK_REPLY(prop, -1);
+ prop = ndb_mgm_call(handle, reply, "get connection parameter", &args);
+ CHECK_REPLY(prop, -2);
int res= -1;
do {
@@ -2058,10 +2059,13 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
res= 0;
} while(0);
- prop->get("value",value);
+ if(!prop->get("value",value)){
+ ndbout_c("Unable to get value");
+ res = -3;
+ }
delete prop;
- return res;
+ DBUG_RETURN(res);
}
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 510e662b78d..10691293700 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -584,7 +584,8 @@ MgmtSrvr::start(BaseString &error_string)
return false;
}
}
- theFacade= TransporterFacade::theFacadeInstance= new TransporterFacade();
+ theFacade= TransporterFacade::theFacadeInstance
+ = new TransporterFacade(m_config_retriever->get_mgmHandle());
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
@@ -2806,7 +2807,8 @@ MgmtSrvr::getConnectionDbParameter(int node1,
Uint32 n1,n2;
iter.get(CFG_CONNECTION_NODE_1, &n1);
iter.get(CFG_CONNECTION_NODE_2, &n2);
- if(n1 == (unsigned)node1 && n2 == (unsigned)node2)
+ if((n1 == (unsigned)node1 && n2 == (unsigned)node2)
+ || (n1 == (unsigned)node2 && n2 == (unsigned)node1))
break;
}
if(!iter.valid()) {
@@ -2820,7 +2822,7 @@ MgmtSrvr::getConnectionDbParameter(int node1,
}
msg.assfmt("%u",*value);
- return 1;
+ DBUG_RETURN(1);
}
template class Vector<SigMatch>;
diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp
index 6fda93b67a1..0f0d1582d1a 100644
--- a/ndb/src/mgmsrv/Services.cpp
+++ b/ndb/src/mgmsrv/Services.cpp
@@ -1399,10 +1399,9 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx,
&value,
result);
- m_output->println("set connection parameter reply");
- m_output->println("message: %s", result.c_str());
+ m_output->println("get connection parameter reply");
m_output->println("value: %u", value);
- m_output->println("result: %s", (ret>0)?"Ok":"Failed");
+ m_output->println("result: %s", (ret>0)?"Ok":result.c_str());
m_output->println("");
}
diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp
index f9dde92f17d..6287e0ff43e 100644
--- a/ndb/src/ndbapi/TransporterFacade.cpp
+++ b/ndb/src/ndbapi/TransporterFacade.cpp
@@ -472,12 +472,13 @@ void TransporterFacade::threadMainReceive(void)
theTransporterRegistry->stopReceiving();
}
-TransporterFacade::TransporterFacade() :
+TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) :
theTransporterRegistry(0),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
- m_fragmented_signal_id(0)
+ m_fragmented_signal_id(0),
+ m_mgm_handle(mgm_handle)
{
theOwnId = 0;
@@ -501,7 +502,7 @@ bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
theOwnId = nodeId;
- theTransporterRegistry = new TransporterRegistry(this);
+ theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this);
const int res = IPCConfig::configureTransporters(nodeId,
* props,
diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp
index 5c1e1cb839c..bbc1c77a26a 100644
--- a/ndb/src/ndbapi/TransporterFacade.hpp
+++ b/ndb/src/ndbapi/TransporterFacade.hpp
@@ -24,6 +24,7 @@
#include <NdbMutex.h>
#include "DictCache.hpp"
#include <BlockNumbers.h>
+#include <mgmapi.h>
class ClusterMgr;
class ArbitMgr;
@@ -46,7 +47,7 @@ extern "C" {
class TransporterFacade
{
public:
- TransporterFacade();
+ TransporterFacade(NdbMgmHandle mgm_handle);
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
@@ -131,7 +132,8 @@ private:
bool isConnected(NodeId aNodeId);
void doStop();
-
+
+ NdbMgmHandle m_mgm_handle;
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp
index 3b159838171..130287b6fee 100644
--- a/ndb/src/ndbapi/ndb_cluster_connection.cpp
+++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp
@@ -40,7 +40,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
- m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
m_config_retriever= 0;
m_connect_thread= 0;
@@ -58,6 +57,10 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
+
+ m_facade= TransporterFacade::theFacadeInstance
+ = new TransporterFacade(m_config_retriever->get_mgmHandle());
+
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",