summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/transporter/TCP_Transporter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/transporter/TCP_Transporter.cpp')
-rw-r--r--storage/ndb/src/common/transporter/TCP_Transporter.cpp434
1 files changed, 434 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/TCP_Transporter.cpp b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
new file mode 100644
index 00000000000..fd71cf71cd9
--- /dev/null
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
@@ -0,0 +1,434 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include <NdbTCP.h>
+#include "TCP_Transporter.hpp"
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+// End of stuff to be moved
+
+#if defined NDB_OSE || defined NDB_SOFTOSE
+#define inet_send inet_send
+#else
+#define inet_send send
+#endif
+
+#ifdef NDB_WIN32
+class ndbstrerror
+{
+public:
+ ndbstrerror(int iError);
+ ~ndbstrerror(void);
+ operator char*(void) { return m_szError; };
+
+private:
+ int m_iError;
+ char* m_szError;
+};
+
+ndbstrerror::ndbstrerror(int iError)
+: m_iError(iError)
+{
+ FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
+ 0,
+ iError,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR)&m_szError,
+ 0,
+ 0);
+}
+
+ndbstrerror::~ndbstrerror(void)
+{
+ LocalFree( m_szError );
+ m_szError = 0;
+}
+#else
+#define ndbstrerror strerror
+#endif
+
+TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
+ int sendBufSize, int maxRecvSize,
+ const char *lHostName,
+ const char *rHostName,
+ int r_port,
+ bool isMgmConnection,
+ NodeId lNodeId,
+ NodeId rNodeId,
+ NodeId serverNodeId,
+ bool chksm, bool signalId,
+ Uint32 _reportFreq) :
+ Transporter(t_reg, tt_TCP_TRANSPORTER,
+ lHostName, rHostName, r_port, isMgmConnection,
+ lNodeId, rNodeId, serverNodeId,
+ 0, false, chksm, signalId),
+ m_sendBuffer(sendBufSize)
+{
+ maxReceiveSize = maxRecvSize;
+
+ // Initialize member variables
+ theSocket = NDB_INVALID_SOCKET;
+
+ sendCount = receiveCount = 0;
+ sendSize = receiveSize = 0;
+ reportFreq = _reportFreq;
+
+ sockOptRcvBufSize = 70080;
+ sockOptSndBufSize = 71540;
+ sockOptNodelay = 1;
+ sockOptTcpMaxSeg = 4096;
+}
+
+TCP_Transporter::~TCP_Transporter() {
+
+ // Disconnect
+ if (theSocket != NDB_INVALID_SOCKET)
+ doDisconnect();
+
+ // Delete send buffers
+
+ // Delete receive buffer!!
+ receiveBuffer.destroy();
+}
+
+bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
+{
+ DBUG_ENTER("TCP_Transpporter::connect_server_impl");
+ DBUG_RETURN(connect_common(sockfd));
+}
+
+bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
+{
+ DBUG_ENTER("TCP_Transpporter::connect_client_impl");
+ DBUG_RETURN(connect_common(sockfd));
+}
+
+bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
+{
+ theSocket = sockfd;
+ setSocketOptions();
+ setSocketNonBlocking(theSocket);
+ DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
+ remoteNodeId));
+ return true;
+}
+
+bool
+TCP_Transporter::initTransporter() {
+
+ // Allocate buffer for receiving
+ // Let it be the maximum size we receive plus 8 kB for any earlier received
+ // incomplete messages (slack)
+ Uint32 recBufSize = maxReceiveSize;
+ if(recBufSize < MAX_MESSAGE_SIZE){
+ recBufSize = MAX_MESSAGE_SIZE;
+ }
+
+ if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
+ return false;
+ }
+
+ // Allocate buffers for sending
+ if (!m_sendBuffer.initBuffer(remoteNodeId)) {
+ // XXX What shall be done here?
+ // The same is valid for the other init-methods
+ return false;
+ }
+
+ return true;
+}
+
+void
+TCP_Transporter::setSocketOptions(){
+ if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
+ (char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("The setsockopt SO_RCVBUF error code = %d", InetErrno);
+#endif
+ }//if
+
+ if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
+ (char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("The setsockopt SO_SNDBUF error code = %d", InetErrno);
+#endif
+ }//if
+
+ //-----------------------------------------------
+ // Set the TCP_NODELAY option so also small packets are sent
+ // as soon as possible
+ //-----------------------------------------------
+ if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
+ (char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("The setsockopt TCP_NODELAY error code = %d", InetErrno);
+#endif
+ }//if
+}
+
+
+#ifdef NDB_WIN32
+
+bool
+TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
+ unsigned long ul = 1;
+ if(ioctlsocket(socket, FIONBIO, &ul))
+ {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("Set non-blocking server error3: %d", InetErrno);
+#endif
+ }//if
+ return true;
+}
+
+#else
+
+bool
+TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
+ int flags;
+ flags = fcntl(socket, F_GETFL, 0);
+ if (flags < 0) {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("Set non-blocking server error1: %s", strerror(InetErrno));
+#endif
+ }//if
+ flags |= NDB_NONBLOCK;
+ if (fcntl(socket, F_SETFL, flags) == -1) {
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("Set non-blocking server error2: %s", strerror(InetErrno));
+#endif
+ }//if
+ return true;
+}
+
+#endif
+
+bool
+TCP_Transporter::sendIsPossible(struct timeval * timeout) {
+#ifdef NDB_OSE
+ /**
+ * In OSE you cant do select without owning a socket,
+ * and since this method might be called by any thread in the api
+ * we choose not to implementet and always return true after sleeping
+ * a while.
+ *
+ * Note that this only sensible as long as the sockets are non blocking
+ */
+ if(theSocket >= 0){
+ Uint32 timeOutMillis = timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
+ NdbSleep_MilliSleep(timeOutMillis);
+ return true;
+ }
+ return false;
+#else
+ if(theSocket != NDB_INVALID_SOCKET){
+ fd_set writeset;
+ FD_ZERO(&writeset);
+ FD_SET(theSocket, &writeset);
+
+ int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
+
+ if ((selectReply > 0) && FD_ISSET(theSocket, &writeset))
+ return true;
+ else
+ return false;
+ }
+ return false;
+#endif
+}
+
+
+Uint32 *
+TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
+
+ Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
+
+ struct timeval timeout = {0, 10000};
+
+ if (insertPtr == 0) {
+ //-------------------------------------------------
+ // Buffer was completely full. We have severe problems.
+ // We will attempt to wait for a small time
+ //-------------------------------------------------
+ if(sendIsPossible(&timeout)) {
+ //-------------------------------------------------
+ // Send is possible after the small timeout.
+ //-------------------------------------------------
+ if(!doSend()){
+ return 0;
+ } else {
+ //-------------------------------------------------
+ // Since send was successful we will make a renewed
+ // attempt at inserting the signal into the buffer.
+ //-------------------------------------------------
+ insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
+ }//if
+ } else {
+ return 0;
+ }//if
+ }
+ return insertPtr;
+}
+
+void
+TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
+ m_sendBuffer.updateInsertPtr(lenBytes);
+
+ const int bufsize = m_sendBuffer.bufferSize();
+ if(bufsize > TCP_SEND_LIMIT) {
+ //-------------------------------------------------
+ // Buffer is full and we are ready to send. We will
+ // not wait since the signal is already in the buffer.
+ // Force flag set has the same indication that we
+ // should always send. If it is not possible to send
+ // we will not worry since we will soon be back for
+ // a renewed trial.
+ //-------------------------------------------------
+ struct timeval no_timeout = {0,0};
+ if(sendIsPossible(&no_timeout)) {
+ //-------------------------------------------------
+ // Send was possible, attempt at a send.
+ //-------------------------------------------------
+ doSend();
+ }//if
+ }
+}
+
+#define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
+ (!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
+
+
+bool
+TCP_Transporter::doSend() {
+ // If no sendbuffers are used nothing is done
+ // Sends the contents of the SendBuffers until they are empty
+ // or until select does not select the socket for write.
+ // Before calling send, the socket must be selected for write
+ // using "select"
+ // It writes on the external TCP/IP interface until the send buffer is empty
+ // and as long as write is possible (test it using select)
+
+ // Empty the SendBuffers
+
+ const char * const sendPtr = m_sendBuffer.sendPtr;
+ const Uint32 sizeToSend = m_sendBuffer.sendDataSize;
+ if (sizeToSend > 0){
+ const int nBytesSent = inet_send(theSocket, sendPtr, sizeToSend, 0);
+
+ if (nBytesSent > 0) {
+ m_sendBuffer.bytesSent(nBytesSent);
+
+ sendCount ++;
+ sendSize += nBytesSent;
+ if(sendCount == reportFreq){
+ reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
+ sendCount = 0;
+ sendSize = 0;
+ }
+ } else {
+ // Send failed
+#if defined DEBUG_TRANSPORTER
+ ndbout_c("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
+ "errno = %d strerror = %s",
+ DISCONNECT_ERRNO(InetErrno, nBytesSent),
+ remoteNodeId, nBytesSent, InetErrno,
+ (char*)ndbstrerror(InetErrno));
+#endif
+ if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
+ doDisconnect();
+ report_disconnect(InetErrno);
+ }
+
+ return false;
+ }
+ }
+ return true;
+}
+
+int
+TCP_Transporter::doReceive() {
+ // Select-function must return the socket for read
+ // before this method is called
+ // It reads the external TCP/IP interface once
+ Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
+ if(size > 0){
+ const int nBytesRead = recv(theSocket,
+ receiveBuffer.insertPtr,
+ size < maxReceiveSize ? size : maxReceiveSize,
+ 0);
+
+ if (nBytesRead > 0) {
+ receiveBuffer.sizeOfData += nBytesRead;
+ receiveBuffer.insertPtr += nBytesRead;
+
+ if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
+#ifdef DEBUG_TRANSPORTER
+ ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
+ receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
+ ndbout_c("nBytesRead = %d", nBytesRead);
+#endif
+ ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
+ receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
+ report_error(TE_INVALID_MESSAGE_LENGTH);
+ return 0;
+ }
+
+ receiveCount ++;
+ receiveSize += nBytesRead;
+
+ if(receiveCount == reportFreq){
+ reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
+ receiveCount = 0;
+ receiveSize = 0;
+ }
+ return nBytesRead;
+ } else {
+#if defined DEBUG_TRANSPORTER
+ ndbout_c("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
+ "errno = %d strerror = %s",
+ DISCONNECT_ERRNO(InetErrno, nBytesRead),
+ remoteNodeId, nBytesRead, InetErrno,
+ (char*)ndbstrerror(InetErrno));
+#endif
+ if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
+ // The remote node has closed down
+ doDisconnect();
+ report_disconnect(InetErrno);
+ }
+ }
+ return nBytesRead;
+ } else {
+ return 0;
+ }
+}
+
+void
+TCP_Transporter::disconnectImpl() {
+ if(theSocket != NDB_INVALID_SOCKET){
+ if(NDB_CLOSE_SOCKET(theSocket) < 0){
+ report_error(TE_ERROR_CLOSING_SOCKET);
+ }
+ }
+
+ // Empty send och receive buffers
+ receiveBuffer.clear();
+ m_sendBuffer.emptyBuffer();
+
+ theSocket = NDB_INVALID_SOCKET;
+}