summaryrefslogtreecommitdiff
path: root/sql/semisync_master_ack_receiver.h
diff options
context:
space:
mode:
authorAndrei Elkin <andrei.elkin@mariadb.com>2018-10-15 15:22:45 +0300
committerAndrei Elkin <andrei.elkin@mariadb.com>2018-11-13 10:29:18 +0200
commit6db773a5420fc7eedd6c02989967abbca3212143 (patch)
treed464ea35c131ec7ad779b4af3db21f0a599f44a6 /sql/semisync_master_ack_receiver.h
parentc29c39a7dc2b36dfaa56ae8716cfd808e35d6673 (diff)
downloadmariadb-git-6db773a5420fc7eedd6c02989967abbca3212143.tar.gz
MDEV-17437 Semisync master fires invalid fd value assert
The semisync ack collector hits fd's out-of-bound value assert through a stack of /usr/sbin/mysqld(_ZN12Ack_receiver17get_slave_socketsEP6fd_setPj+0x70)[0x7fa3bbe27400] /usr/sbin/mysqld(_ZN12Ack_receiver3runEv+0x540)[0x7fa3bbe27980] /usr/sbin/mysqld(ack_receive_handler+0x19)[0x7fa3bbe27a79] The reason of the failure must be the same as in https://bugs.mysql.com/bug.php?id=79865 whose fixes are applied with minor changes. Specifically, the semisync ack thread is changed to use poll() instead of select() on platforms where the former is defined. On the systems that still use select(), Ack receive thread will generate an error and semi sync will be switched off. Windows systems is exception case because on windows this limitation does not exists. The sustain manual testing with `mysqlslap --concurrency > 1024' in "background" while the slave io thread is restarting multiple times.
Diffstat (limited to 'sql/semisync_master_ack_receiver.h')
-rw-r--r--sql/semisync_master_ack_receiver.h147
1 files changed, 134 insertions, 13 deletions
diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h
index 619748a2159..9ff99edca5d 100644
--- a/sql/semisync_master_ack_receiver.h
+++ b/sql/semisync_master_ack_receiver.h
@@ -20,6 +20,22 @@
#include "my_pthread.h"
#include "sql_class.h"
#include "semisync.h"
+#include <vector>
+
+struct Slave :public ilink
+{
+ THD *thd;
+ Vio vio;
+#ifdef HAVE_POLL
+ uint m_fds_index;
+#endif
+ my_socket sock_fd() const { return vio.mysql_socket.fd; }
+ uint server_id() const { return thd->variables.server_id; }
+};
+
+typedef I_List<Slave> Slave_ilist;
+typedef I_List_iterator<Slave> Slave_ilist_iterator;
+
/**
Ack_receiver is responsible to control ack receive thread and maintain
slave information used by ack receive thread.
@@ -92,18 +108,7 @@ private:
/* If slave list is updated(add or remove). */
bool m_slaves_changed;
- class Slave :public ilink
- {
-public:
- THD *thd;
- Vio vio;
-
- my_socket sock_fd() { return vio.mysql_socket.fd; }
- uint server_id() { return thd->variables.server_id; }
- };
-
- I_List<Slave> m_slaves;
-
+ Slave_ilist m_slaves;
pthread_t m_pid;
/* Declare them private, so no one can copy the object. */
@@ -112,8 +117,124 @@ public:
void set_stage_info(const PSI_stage_info &stage);
void wait_for_slave_connection();
- my_socket get_slave_sockets(fd_set *fds, uint *count);
};
+
+#ifdef HAVE_POLL
+#include <sys/poll.h>
+#include <vector>
+
+class Poll_socket_listener
+{
+public:
+ Poll_socket_listener(const Slave_ilist &slaves)
+ :m_slaves(slaves)
+ {
+ }
+
+ bool listen_on_sockets()
+ {
+ return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/);
+ }
+
+ bool is_socket_active(const Slave *slave)
+ {
+ return m_fds[slave->m_fds_index].revents & POLLIN;
+ }
+
+ void clear_socket_info(const Slave *slave)
+ {
+ m_fds[slave->m_fds_index].fd= -1;
+ m_fds[slave->m_fds_index].events= 0;
+ }
+
+ uint init_slave_sockets()
+ {
+ Slave_ilist_iterator it(const_cast<Slave_ilist&>(m_slaves));
+ Slave *slave;
+ uint fds_index= 0;
+
+ m_fds.clear();
+ while ((slave= it++))
+ {
+ pollfd poll_fd;
+ poll_fd.fd= slave->sock_fd();
+ poll_fd.events= POLLIN;
+ m_fds.push_back(poll_fd);
+ slave->m_fds_index= fds_index++;
+ }
+ return fds_index;
+ }
+
+private:
+ const Slave_ilist &m_slaves;
+ std::vector<pollfd> m_fds;
+};
+
+#else //NO POLL
+
+class Select_socket_listener
+{
+public:
+ Select_socket_listener(const Slave_ilist &slaves)
+ :m_slaves(slaves), m_max_fd(INVALID_SOCKET)
+ {
+ }
+
+ bool listen_on_sockets()
+ {
+ /* Reinitialze the fds with active fds before calling select */
+ m_fds= m_init_fds;
+ struct timeval tv= {1,0};
+ /* select requires max fd + 1 for the first argument */
+ return select(m_max_fd+1, &m_fds, NULL, NULL, &tv);
+ }
+
+ bool is_socket_active(const Slave *slave)
+ {
+ return FD_ISSET(slave->sock_fd(), &m_fds);
+ }
+
+ void clear_socket_info(const Slave *slave)
+ {
+ FD_CLR(slave->sock_fd(), &m_init_fds);
+ }
+
+ uint init_slave_sockets()
+ {
+ Slave_ilist_iterator it(m_slaves);
+ Slave *slave;
+ uint fds_index= 0;
+
+ FD_ZERO(&m_init_fds);
+ while ((slave= it++))
+ {
+ my_socket socket_id= slave->sock_fd();
+ m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd);
+#ifndef WINDOWS
+ if (socket_id > FD_SETSIZE)
+ {
+ sql_print_error("Semisync slave socket fd is %u. "
+ "select() cannot handle if the socket fd is "
+ "greater than %u (FD_SETSIZE).", socket_id, FD_SETSIZE);
+ return 0;
+ }
+#endif //WINDOWS
+ FD_SET(socket_id, &m_init_fds);
+ fds_index++;
+ }
+ return fds_index;
+ }
+ my_socket get_max_fd() { return m_max_fd; }
+
+private:
+ const Slave_ilist &m_slaves;
+ my_socket m_max_fd;
+ fd_set m_init_fds;
+ fd_set m_fds;
+};
+
+#endif //HAVE_POLL
+
extern Ack_receiver ack_receiver;
#endif