/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ #ifndef TCP_TRANSPORTER_HPP #define TCP_TRANSPORTER_HPP #include "Transporter.hpp" #include "SendBuffer.hpp" #include struct ReceiveBuffer { Uint32 *startOfBuffer; // Pointer to start of the receive buffer Uint32 *readPtr; // Pointer to start reading data char *insertPtr; // Pointer to first position in the receiveBuffer // in which to insert received data. Earlier // received incomplete messages (slack) are // copied into the first part of the receiveBuffer Uint32 sizeOfData; // In bytes Uint32 sizeOfBuffer; ReceiveBuffer() {} bool init(int bytes); void destroy(); void clear(); void incompleteMessage(); }; class TCP_Transporter : public Transporter { friend class TransporterRegistry; private: // Initialize member variables TCP_Transporter(TransporterRegistry&, int sendBufferSize, int maxReceiveSize, const char *lHostName, const char *rHostName, int r_port, bool isMgmConnection, NodeId lHostId, NodeId rHostId, NodeId serverNodeId, bool checksum, bool signalId, Uint32 reportFreq = 4096); // Disconnect, delete send buffers and receive buffer virtual ~TCP_Transporter(); /** * Allocate buffers for sending and receiving */ bool initTransporter(); Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio); void updateWritePtr(Uint32 lenBytes, Uint32 prio); bool hasDataToSend() const ; /** * Retrieves the contents of the send buffers and writes it on * the external TCP/IP interface until the send buffers are empty * and as long as write is possible. */ bool doSend(); /** * It reads the external TCP/IP interface once * and puts the data in the receiveBuffer */ int doReceive(); /** * Returns socket (used for select) */ NDB_SOCKET_TYPE getSocket() const; /** * Get Receive Data * * Returns - no of bytes to read * and set ptr */ virtual Uint32 getReceiveData(Uint32 ** ptr); /** * Update receive data ptr */ virtual void updateReceiveDataPtr(Uint32 bytesRead); virtual Uint32 get_free_buffer() const; inline bool hasReceiveData () const { return receiveBuffer.sizeOfData > 0; } protected: /** * Setup client/server and perform connect/accept * Is used both by clients and servers * A client connects to the remote server * A server accepts any new connections */ virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd); virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd); bool connect_common(NDB_SOCKET_TYPE sockfd); /** * Disconnects a TCP/IP node. Empty send and receivebuffer. */ virtual void disconnectImpl(); private: /** * Send buffers */ SendBuffer m_sendBuffer; // Sending/Receiving socket used by both client and server NDB_SOCKET_TYPE theSocket; Uint32 maxReceiveSize; /** * Socket options */ int sockOptRcvBufSize; int sockOptSndBufSize; int sockOptNodelay; int sockOptTcpMaxSeg; void setSocketOptions(); static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket); bool sendIsPossible(struct timeval * timeout); /** * Statistics */ Uint32 reportFreq; Uint32 receiveCount; Uint64 receiveSize; Uint32 sendCount; Uint64 sendSize; ReceiveBuffer receiveBuffer; }; inline NDB_SOCKET_TYPE TCP_Transporter::getSocket() const { return theSocket; } inline Uint32 TCP_Transporter::getReceiveData(Uint32 ** ptr){ (* ptr) = receiveBuffer.readPtr; return receiveBuffer.sizeOfData; } inline void TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){ char * ptr = (char *)receiveBuffer.readPtr; ptr += bytesRead; receiveBuffer.readPtr = (Uint32*)ptr; receiveBuffer.sizeOfData -= bytesRead; receiveBuffer.incompleteMessage(); } inline bool TCP_Transporter::hasDataToSend() const { return m_sendBuffer.dataSize > 0; } inline bool ReceiveBuffer::init(int bytes){ #ifdef DEBUG_TRANSPORTER ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl; #endif startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1]; sizeOfBuffer = bytes + sizeof(Uint32); clear(); return true; } inline void ReceiveBuffer::destroy(){ delete[] startOfBuffer; sizeOfBuffer = 0; startOfBuffer = 0; clear(); } inline void ReceiveBuffer::clear(){ readPtr = startOfBuffer; insertPtr = (char *)startOfBuffer; sizeOfData = 0; } inline void ReceiveBuffer::incompleteMessage() { if(startOfBuffer != readPtr){ if(sizeOfData != 0) memmove(startOfBuffer, readPtr, sizeOfData); readPtr = startOfBuffer; insertPtr = ((char *)startOfBuffer) + sizeOfData; } } #endif // Define of TCP_Transporter_H