summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/transporter/SHM_Transporter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/transporter/SHM_Transporter.cpp')
-rw-r--r--storage/ndb/src/common/transporter/SHM_Transporter.cpp367
1 files changed, 367 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/SHM_Transporter.cpp b/storage/ndb/src/common/transporter/SHM_Transporter.cpp
new file mode 100644
index 00000000000..e2d23cf94e2
--- /dev/null
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp
@@ -0,0 +1,367 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <ndb_global.h>
+
+#include "SHM_Transporter.hpp"
+#include "TransporterInternalDefinitions.hpp"
+#include <TransporterCallback.hpp>
+#include <NdbSleep.h>
+#include <NdbOut.hpp>
+
+#include <InputStream.hpp>
+#include <OutputStream.hpp>
+
+extern int g_ndb_shm_signum;
+
+SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
+ const char *lHostName,
+ const char *rHostName,
+ int r_port,
+ bool isMgmConnection,
+ NodeId lNodeId,
+ NodeId rNodeId,
+ NodeId serverNodeId,
+ bool checksum,
+ bool signalId,
+ key_t _shmKey,
+ Uint32 _shmSize) :
+ Transporter(t_reg, tt_SHM_TRANSPORTER,
+ lHostName, rHostName, r_port, isMgmConnection,
+ lNodeId, rNodeId, serverNodeId,
+ 0, false, checksum, signalId),
+ shmKey(_shmKey),
+ shmSize(_shmSize)
+{
+ _shmSegCreated = false;
+ _attached = false;
+
+ shmBuf = 0;
+ reader = 0;
+ writer = 0;
+
+ setupBuffersDone=false;
+#ifdef DEBUG_TRANSPORTER
+ printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
+#endif
+ m_signal_threshold = 4096;
+}
+
+SHM_Transporter::~SHM_Transporter(){
+ doDisconnect();
+}
+
+bool
+SHM_Transporter::initTransporter(){
+ if (g_ndb_shm_signum)
+ return true;
+ return false;
+}
+
+void
+SHM_Transporter::setupBuffers(){
+ Uint32 sharedSize = 0;
+ sharedSize += 28; //SHM_Reader::getSharedSize();
+ sharedSize += 28; //SHM_Writer::getSharedSize();
+
+ const Uint32 slack = MAX_MESSAGE_SIZE;
+
+ /**
+ * NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
+ */
+ Uint32 sizeOfBuffer = shmSize;
+ sizeOfBuffer -= 2*sharedSize;
+ sizeOfBuffer /= 2;
+
+ Uint32 * base1 = (Uint32*)shmBuf;
+
+ Uint32 * sharedReadIndex1 = base1;
+ Uint32 * sharedWriteIndex1 = base1 + 1;
+ serverStatusFlag = base1 + 4;
+ char * startOfBuf1 = shmBuf+sharedSize;
+
+ Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
+ Uint32 * sharedReadIndex2 = base2;
+ Uint32 * sharedWriteIndex2 = base2 + 1;
+ clientStatusFlag = base2 + 4;
+ char * startOfBuf2 = ((char *)base2)+sharedSize;
+
+ if(isServer){
+ * serverStatusFlag = 0;
+ reader = new SHM_Reader(startOfBuf1,
+ sizeOfBuffer,
+ slack,
+ sharedReadIndex1,
+ sharedWriteIndex1);
+
+ writer = new SHM_Writer(startOfBuf2,
+ sizeOfBuffer,
+ slack,
+ sharedReadIndex2,
+ sharedWriteIndex2);
+
+ * sharedReadIndex1 = 0;
+ * sharedWriteIndex1 = 0;
+
+ * sharedReadIndex2 = 0;
+ * sharedWriteIndex2 = 0;
+
+ reader->clear();
+ writer->clear();
+
+ * serverStatusFlag = 1;
+
+#ifdef DEBUG_TRANSPORTER
+ printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
+ printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %d (%p) = %d\n",
+ (char*)sharedReadIndex1-shmBuf,
+ sharedReadIndex1, *sharedReadIndex1);
+ printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ (char*)sharedWriteIndex1-shmBuf,
+ sharedWriteIndex1, *sharedWriteIndex1);
+
+ printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %d (%p) = %d\n",
+ (char*)sharedReadIndex2-shmBuf,
+ sharedReadIndex2, *sharedReadIndex2);
+ printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ (char*)sharedWriteIndex2-shmBuf,
+ sharedWriteIndex2, *sharedWriteIndex2);
+
+ printf("sizeOfBuffer = %d\n", sizeOfBuffer);
+#endif
+ } else {
+ * clientStatusFlag = 0;
+ reader = new SHM_Reader(startOfBuf2,
+ sizeOfBuffer,
+ slack,
+ sharedReadIndex2,
+ sharedWriteIndex2);
+
+ writer = new SHM_Writer(startOfBuf1,
+ sizeOfBuffer,
+ slack,
+ sharedReadIndex1,
+ sharedWriteIndex1);
+
+ * sharedReadIndex2 = 0;
+ * sharedWriteIndex1 = 0;
+
+ reader->clear();
+ writer->clear();
+ * clientStatusFlag = 1;
+#ifdef DEBUG_TRANSPORTER
+ printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
+ printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %d (%p) = %d\n",
+ (char*)sharedReadIndex2-shmBuf,
+ sharedReadIndex2, *sharedReadIndex2);
+ printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ (char*)sharedWriteIndex2-shmBuf,
+ sharedWriteIndex2, *sharedWriteIndex2);
+
+ printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %d (%p) = %d\n",
+ (char*)sharedReadIndex1-shmBuf,
+ sharedReadIndex1, *sharedReadIndex1);
+ printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ (char*)sharedWriteIndex1-shmBuf,
+ sharedWriteIndex1, *sharedWriteIndex1);
+
+ printf("sizeOfBuffer = %d\n", sizeOfBuffer);
+#endif
+ }
+#ifdef DEBUG_TRANSPORTER
+ printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
+#endif
+}
+
+bool
+SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
+{
+ DBUG_ENTER("SHM_Transporter::connect_server_impl");
+ SocketOutputStream s_output(sockfd);
+ SocketInputStream s_input(sockfd);
+ char buf[256];
+
+ // Create
+ if(!_shmSegCreated){
+ if (!ndb_shm_create()) {
+ report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT);
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+ _shmSegCreated = true;
+ }
+
+ // Attach
+ if(!_attached){
+ if (!ndb_shm_attach()) {
+ report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+ _attached = true;
+ }
+
+ // Send ok to client
+ s_output.println("shm server 1 ok: %d",
+ m_transporter_registry.m_shm_own_pid);
+
+ // Wait for ok from client
+ if (s_input.gets(buf, 256) == 0)
+ {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
+ {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ int r= connect_common(sockfd);
+
+ if (r) {
+ // Send ok to client
+ s_output.println("shm server 2 ok");
+ // Wait for ok from client
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+ DBUG_PRINT("info", ("Successfully connected server to node %d",
+ remoteNodeId));
+ }
+
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(r);
+}
+
+bool
+SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
+{
+ DBUG_ENTER("SHM_Transporter::connect_client_impl");
+ SocketInputStream s_input(sockfd);
+ SocketOutputStream s_output(sockfd);
+ char buf[256];
+
+#if 1
+#endif
+
+ // Wait for server to create and attach
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_PRINT("error", ("Server id %d did not attach",
+ remoteNodeId));
+ DBUG_RETURN(false);
+ }
+
+ if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
+ {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ // Create
+ if(!_shmSegCreated){
+ if (!ndb_shm_get()) {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_PRINT("error", ("Failed create of shm seg to node %d",
+ remoteNodeId));
+ DBUG_RETURN(false);
+ }
+ _shmSegCreated = true;
+ }
+
+ // Attach
+ if(!_attached){
+ if (!ndb_shm_attach()) {
+ report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
+ remoteNodeId));
+ DBUG_RETURN(false);
+ }
+ _attached = true;
+ }
+
+ // Send ok to server
+ s_output.println("shm client 1 ok: %d",
+ m_transporter_registry.m_shm_own_pid);
+
+ int r= connect_common(sockfd);
+
+ if (r) {
+ // Wait for ok from server
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_PRINT("error", ("No ok from server node %d",
+ remoteNodeId));
+ DBUG_RETURN(false);
+ }
+ // Send ok to server
+ s_output.println("shm client 2 ok");
+ DBUG_PRINT("info", ("Successfully connected client to node %d",
+ remoteNodeId));
+ }
+
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(r);
+}
+
+bool
+SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
+{
+ if (!checkConnected()) {
+ DBUG_PRINT("error", ("Already connected to node %d",
+ remoteNodeId));
+ return false;
+ }
+
+ if(!setupBuffersDone)
+ {
+ setupBuffers();
+ setupBuffersDone=true;
+ }
+
+ if(setupBuffersDone)
+ {
+ NdbSleep_MilliSleep(m_timeOutMillis);
+ if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
+ {
+ m_last_signal = 0;
+ return true;
+ }
+ }
+
+ DBUG_PRINT("error", ("Failed to set up buffers to node %d",
+ remoteNodeId));
+ return false;
+}
+
+void
+SHM_Transporter::doSend()
+{
+ if(m_last_signal)
+ {
+ m_last_signal = 0;
+ kill(m_remote_pid, g_ndb_shm_signum);
+ }
+}