summaryrefslogtreecommitdiff
path: root/ndb/src/common/transporter/SHM_Transporter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/common/transporter/SHM_Transporter.cpp')
-rw-r--r--ndb/src/common/transporter/SHM_Transporter.cpp85
1 files changed, 41 insertions, 44 deletions
diff --git a/ndb/src/common/transporter/SHM_Transporter.cpp b/ndb/src/common/transporter/SHM_Transporter.cpp
index ab161d8c18c..e4051519b86 100644
--- a/ndb/src/common/transporter/SHM_Transporter.cpp
+++ b/ndb/src/common/transporter/SHM_Transporter.cpp
@@ -26,6 +26,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
+extern int g_shm_pid;
+
SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
@@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
#ifdef DEBUG_TRANSPORTER
printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
#endif
+ m_signal_threshold = 4096;
}
SHM_Transporter::~SHM_Transporter(){
@@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){
#endif
}
-#if 0
-SendStatus
-SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
- Uint8 prio,
- const Uint32 * const signalData,
- const LinearSegmentPtr ptr[3],
- bool force){
-
- if(isConnected()){
-
- const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr);
-
- Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes);
-
- if(insertPtr != 0){
-
- m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
-
- /**
- * Do funky membar stuff
- */
-
- writer->updateWritePtr(lenBytes);
- return SEND_OK;
-
- } else {
- // NdbSleep_MilliSleep(3);
- //goto tryagain;
- return SEND_BUFFER_FULL;
- }
- }
- return SEND_DISCONNECTED;
-}
-#endif
-
-
bool
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
@@ -247,10 +214,18 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to client
- s_output.println("shm server 1 ok");
-
+ s_output.println("shm server 1 ok: %d",
+ m_transporter_registry.m_shm_own_pid);
+
// Wait for ok from client
- if (s_input.gets(buf, 256) == 0) {
+ if (s_input.gets(buf, 256) == 0)
+ {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
+ {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
@@ -289,6 +264,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(false);
}
+ if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
+ {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
// Create
if(!_shmSegCreated){
if (!ndb_shm_get()) {
@@ -313,10 +294,11 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to server
- s_output.println("shm client 1 ok");
-
+ s_output.println("shm client 1 ok: %d",
+ m_transporter_registry.m_shm_own_pid);
+
int r= connect_common(sockfd);
-
+
if (r) {
// Wait for ok from server
if (s_input.gets(buf, 256) == 0) {
@@ -344,18 +326,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return false;
}
- if(!setupBuffersDone) {
+ if(!setupBuffersDone)
+ {
setupBuffers();
setupBuffersDone=true;
}
- if(setupBuffersDone) {
+ if(setupBuffersDone)
+ {
NdbSleep_MilliSleep(m_timeOutMillis);
if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
+ {
+ m_last_signal = 0;
return true;
+ }
}
DBUG_PRINT("error", ("Failed to set up buffers to node %d",
remoteNodeId));
return false;
}
+
+void
+SHM_Transporter::doSend()
+{
+ if(m_last_signal)
+ {
+ m_last_signal = 0;
+ kill(m_remote_pid, SIGUSR1);
+ }
+}