summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/util/SocketServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/util/SocketServer.cpp')
-rw-r--r--storage/ndb/src/common/util/SocketServer.cpp345
1 files changed, 345 insertions, 0 deletions
diff --git a/storage/ndb/src/common/util/SocketServer.cpp b/storage/ndb/src/common/util/SocketServer.cpp
new file mode 100644
index 00000000000..6019d24d736
--- /dev/null
+++ b/storage/ndb/src/common/util/SocketServer.cpp
@@ -0,0 +1,345 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <ndb_global.h>
+#include <my_pthread.h>
+
+#include <SocketServer.hpp>
+
+#include <NdbTCP.h>
+#include <NdbOut.hpp>
+#include <NdbThread.h>
+#include <NdbSleep.h>
+
+#define DEBUG(x) ndbout << x << endl;
+
+SocketServer::SocketServer(int maxSessions) :
+ m_sessions(10),
+ m_services(5)
+{
+ m_thread = 0;
+ m_stopThread = false;
+ m_maxSessions = maxSessions;
+}
+
+SocketServer::~SocketServer() {
+ unsigned i;
+ for(i = 0; i<m_sessions.size(); i++){
+ delete m_sessions[i].m_session;
+ }
+ for(i = 0; i<m_services.size(); i++){
+ delete m_services[i].m_service;
+ }
+}
+
+bool
+SocketServer::tryBind(unsigned short port, const char * intface) {
+ struct sockaddr_in servaddr;
+ memset(&servaddr, 0, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(port);
+
+ if(intface != 0){
+ if(Ndb_getInAddr(&servaddr.sin_addr, intface))
+ return false;
+ }
+
+ const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock == NDB_INVALID_SOCKET) {
+ return false;
+ }
+
+ const int on = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (const char*)&on, sizeof(on)) == -1) {
+ NDB_CLOSE_SOCKET(sock);
+ return false;
+ }
+
+ if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
+ NDB_CLOSE_SOCKET(sock);
+ return false;
+ }
+
+ NDB_CLOSE_SOCKET(sock);
+ return true;
+}
+
+bool
+SocketServer::setup(SocketServer::Service * service,
+ unsigned short * port,
+ const char * intface){
+ DBUG_ENTER("SocketServer::setup");
+ DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
+ struct sockaddr_in servaddr;
+ memset(&servaddr, 0, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(*port);
+
+ if(intface != 0){
+ if(Ndb_getInAddr(&servaddr.sin_addr, intface))
+ DBUG_RETURN(false);
+ }
+
+ const NDB_SOCKET_TYPE sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock == NDB_INVALID_SOCKET) {
+ DBUG_PRINT("error",("socket() - %d - %s",
+ errno, strerror(errno)));
+ DBUG_RETURN(false);
+ }
+
+ const int on = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (const char*)&on, sizeof(on)) == -1) {
+ DBUG_PRINT("error",("getsockopt() - %d - %s",
+ errno, strerror(errno)));
+ NDB_CLOSE_SOCKET(sock);
+ DBUG_RETURN(false);
+ }
+
+ if (bind(sock, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
+ DBUG_PRINT("error",("bind() - %d - %s",
+ errno, strerror(errno)));
+ NDB_CLOSE_SOCKET(sock);
+ DBUG_RETURN(false);
+ }
+
+ /* Get the port we bound to */
+ SOCKET_SIZE_TYPE sock_len = sizeof(servaddr);
+ if(getsockname(sock,(struct sockaddr*)&servaddr,&sock_len)<0) {
+ ndbout_c("An error occurred while trying to find out what"
+ " port we bound to. Error: %s",strerror(errno));
+ NDB_CLOSE_SOCKET(sock);
+ DBUG_RETURN(false);
+ }
+
+ DBUG_PRINT("info",("bound to %u",ntohs(servaddr.sin_port)));
+ if (listen(sock, m_maxSessions) == -1){
+ DBUG_PRINT("error",("listen() - %d - %s",
+ errno, strerror(errno)));
+ NDB_CLOSE_SOCKET(sock);
+ DBUG_RETURN(false);
+ }
+
+ ServiceInstance i;
+ i.m_socket = sock;
+ i.m_service = service;
+ m_services.push_back(i);
+
+ *port = ntohs(servaddr.sin_port);
+
+ DBUG_RETURN(true);
+}
+
+void
+SocketServer::doAccept(){
+ fd_set readSet, exceptionSet;
+ FD_ZERO(&readSet);
+ FD_ZERO(&exceptionSet);
+
+ m_services.lock();
+ int maxSock = 0;
+ for (unsigned i = 0; i < m_services.size(); i++){
+ const NDB_SOCKET_TYPE s = m_services[i].m_socket;
+ FD_SET(s, &readSet);
+ FD_SET(s, &exceptionSet);
+ maxSock = (maxSock > s ? maxSock : s);
+ }
+ struct timeval timeout;
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+
+ if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
+ for (unsigned i = 0; i < m_services.size(); i++){
+ ServiceInstance & si = m_services[i];
+
+ if(FD_ISSET(si.m_socket, &readSet)){
+ NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
+ if(childSock == NDB_INVALID_SOCKET){
+ continue;
+ }
+
+ SessionInstance s;
+ s.m_service = si.m_service;
+ s.m_session = si.m_service->newSession(childSock);
+ if(s.m_session != 0){
+ m_sessions.push_back(s);
+ startSession(m_sessions.back());
+ }
+
+ continue;
+ }
+
+ if(FD_ISSET(si.m_socket, &exceptionSet)){
+ DEBUG("socket in the exceptionSet");
+ continue;
+ }
+ }
+ }
+ m_services.unlock();
+}
+
+extern "C"
+void*
+socketServerThread_C(void* _ss){
+ SocketServer * ss = (SocketServer *)_ss;
+ ss->doRun();
+ return 0;
+}
+
+void
+SocketServer::startServer(){
+ m_threadLock.lock();
+ if(m_thread == 0 && m_stopThread == false){
+ m_thread = NdbThread_Create(socketServerThread_C,
+ (void**)this,
+ 32768,
+ "NdbSockServ",
+ NDB_THREAD_PRIO_LOW);
+ }
+ m_threadLock.unlock();
+}
+
+void
+SocketServer::stopServer(){
+ m_threadLock.lock();
+ if(m_thread != 0){
+ m_stopThread = true;
+
+ void * res;
+ NdbThread_WaitFor(m_thread, &res);
+ NdbThread_Destroy(&m_thread);
+ m_thread = 0;
+ }
+ m_threadLock.unlock();
+}
+
+void
+SocketServer::doRun(){
+
+ while(!m_stopThread){
+ checkSessions();
+ if(m_sessions.size() < m_maxSessions){
+ doAccept();
+ } else {
+ NdbSleep_MilliSleep(200);
+ }
+ }
+}
+
+void
+SocketServer::startSession(SessionInstance & si){
+ si.m_thread = NdbThread_Create(sessionThread_C,
+ (void**)si.m_session,
+ 32768,
+ "NdbSock_Session",
+ NDB_THREAD_PRIO_LOW);
+}
+
+static
+bool
+transfer(NDB_SOCKET_TYPE sock){
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ const PROCESS p = current_process();
+ const size_t ps = sizeof(PROCESS);
+ int res = setsockopt(sock, SOL_SOCKET, SO_OSEOWNER, &p, ps);
+ if(res != 0){
+ ndbout << "Failed to transfer ownership of socket" << endl;
+ return false;
+ }
+#endif
+ return true;
+}
+
+void
+SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
+{
+ for(int i = m_sessions.size() - 1; i >= 0; i--){
+ (*func)(m_sessions[i].m_session, data);
+ }
+ checkSessions();
+}
+
+void
+SocketServer::checkSessions(){
+ for(int i = m_sessions.size() - 1; i >= 0; i--){
+ if(m_sessions[i].m_session->m_stopped){
+ if(m_sessions[i].m_thread != 0){
+ void* ret;
+ NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
+ NdbThread_Destroy(&m_sessions[i].m_thread);
+ }
+ m_sessions[i].m_session->stopSession();
+ delete m_sessions[i].m_session;
+ m_sessions.erase(i);
+ }
+ }
+}
+
+void
+SocketServer::stopSessions(bool wait){
+ int i;
+ for(i = m_sessions.size() - 1; i>=0; i--)
+ {
+ m_sessions[i].m_session->stopSession();
+ m_sessions[i].m_session->m_stop = true; // to make sure
+ }
+ for(i = m_services.size() - 1; i>=0; i--)
+ m_services[i].m_service->stopSessions();
+
+ if(wait){
+ while(m_sessions.size() > 0){
+ checkSessions();
+ NdbSleep_MilliSleep(100);
+ }
+ }
+}
+
+/***** Session code ******/
+
+extern "C"
+void*
+sessionThread_C(void* _sc){
+ SocketServer::Session * si = (SocketServer::Session *)_sc;
+
+ if(!transfer(si->m_socket)){
+ si->m_stopped = true;
+ return 0;
+ }
+
+ /**
+ * may have m_stopped set if we're transforming a mgm
+ * connection into a transporter connection.
+ */
+ if(!si->m_stopped)
+ {
+ if(!si->m_stop){
+ si->m_stopped = false;
+ si->runSession();
+ } else {
+ NDB_CLOSE_SOCKET(si->m_socket);
+ }
+ }
+
+ si->m_stopped = true;
+ return 0;
+}
+
+template class MutexVector<SocketServer::ServiceInstance>;
+template class MutexVector<SocketServer::SessionInstance>;