diff options
author | Michael Widenius <monty@askmonty.org> | 2011-06-07 14:19:49 +0300 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2011-06-07 14:19:49 +0300 |
commit | 8d7f81089444e20526685b3ffe36b72d5ecdc157 (patch) | |
tree | 490b3363b21ed051c82a01af549a826ec0bef6f2 /plugin/handler_socket/client | |
parent | 2740edcf5680bc03a318378514f88fefa67d5923 (diff) | |
download | mariadb-git-8d7f81089444e20526685b3ffe36b72d5ecdc157.tar.gz |
Upgraded to latest handlersocket code. This fixed LP:766870 "Assertion `next_insert_id == 0' failed with handlersocket"
sql/handler.cc:
Added DBUG_ code
Diffstat (limited to 'plugin/handler_socket/client')
-rw-r--r-- | plugin/handler_socket/client/Makefile.am | 16 | ||||
-rw-r--r-- | plugin/handler_socket/client/hslongrun.cpp | 1041 | ||||
-rw-r--r-- | plugin/handler_socket/client/hstest.cpp | 96 |
3 files changed, 1121 insertions, 32 deletions
diff --git a/plugin/handler_socket/client/Makefile.am b/plugin/handler_socket/client/Makefile.am index 6aff3419565..e89727a7023 100644 --- a/plugin/handler_socket/client/Makefile.am +++ b/plugin/handler_socket/client/Makefile.am @@ -6,9 +6,19 @@ hsclient_LDFLAGS= -static -L../libhsclient -lhsclient hsclient_CXXFLAGS= $(AM_INCLUDES) hstest: hstest.o - $(CXX) $(CXXFLAGS) $(LFLAGS) hstest.o \ - -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) -o hstest + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hstest.o \ + -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \ + -o hstest hstest.o: hstest.cpp - $(CXX) $(CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) -c hstest.cpp + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \ + -c hstest.cpp +hslongrun: hslongrun.o + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hslongrun.o \ + -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \ + -o hslongrun + +hslongrun.o: hslongrun.cpp + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \ + -c hslongrun.cpp diff --git a/plugin/handler_socket/client/hslongrun.cpp b/plugin/handler_socket/client/hslongrun.cpp new file mode 100644 index 00000000000..e82c12b166b --- /dev/null +++ b/plugin/handler_socket/client/hslongrun.cpp @@ -0,0 +1,1041 @@ + +// vim:sw=2:ai + +#include <signal.h> +#include <sys/time.h> +#include <stdio.h> +#include <string.h> +#include <vector> +#include <map> +#include <stdlib.h> +#include <memory> +#include <errno.h> +#include <mysql.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "util.hpp" +#include "auto_ptrcontainer.hpp" +#include "socket.hpp" +#include "hstcpcli.hpp" +#include "string_util.hpp" +#include "mutex.hpp" + +namespace dena { + +struct auto_mysql : private noncopyable { + auto_mysql() : db(0) { + reset(); + } + ~auto_mysql() { + if (db) { + mysql_close(db); + } + } + void reset() { + if (db) { + mysql_close(db); + } + if ((db = mysql_init(0)) == 0) { + fatal_exit("failed to initialize mysql client"); + } + } + operator MYSQL *() const { return db; } + private: + MYSQL *db; +}; + +struct auto_mysql_res : private noncopyable { + auto_mysql_res(MYSQL *db) { + res = mysql_store_result(db); + } + ~auto_mysql_res() { + if (res) { + mysql_free_result(res); + } + } + operator MYSQL_RES *() const { return res; } + private: + MYSQL_RES *res; +}; + +struct auto_mysql_stmt : private noncopyable { + auto_mysql_stmt(MYSQL *db) { + stmt = mysql_stmt_init(db); + } + ~auto_mysql_stmt() { + if (stmt) { + mysql_stmt_close(stmt); + } + } + operator MYSQL_STMT *() const { return stmt; } + private: + MYSQL_STMT *stmt; +}; + +double +gettimeofday_double() +{ + struct timeval tv = { }; + if (gettimeofday(&tv, 0) != 0) { + fatal_abort("gettimeofday"); + } + return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec; +} + +struct record_value { + mutex lock; + bool deleted; + bool unknown_state; + std::string key; + std::vector<std::string> values; + record_value() : deleted(true), unknown_state(false) { } +}; + +struct hs_longrun_shared { + config conf; + socket_args arg; + int verbose; + long num_threads; + int usleep; + volatile mutable int running; + auto_ptrcontainer< std::vector<record_value *> > records; + hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { } +}; + +struct thread_base { + thread_base() : need_join(false), stack_size(256 * 1024) { } + virtual ~thread_base() { + join(); + } + virtual void run() = 0; + void start() { + if (!start_nothrow()) { + fatal_abort("thread::start"); + } + } + bool start_nothrow() { + if (need_join) { + return need_join; /* true */ + } + void *const arg = this; + pthread_attr_t attr; + if (pthread_attr_init(&attr) != 0) { + fatal_abort("pthread_attr_init"); + } + if (pthread_attr_setstacksize(&attr, stack_size) != 0) { + fatal_abort("pthread_attr_setstacksize"); + } + const int r = pthread_create(&thr, &attr, thread_main, arg); + if (pthread_attr_destroy(&attr) != 0) { + fatal_abort("pthread_attr_destroy"); + } + if (r != 0) { + return need_join; /* false */ + } + need_join = true; + return need_join; /* true */ + } + void join() { + if (!need_join) { + return; + } + int e = 0; + if ((e = pthread_join(thr, 0)) != 0) { + fatal_abort("pthread_join"); + } + need_join = false; + } + private: + static void *thread_main(void *arg) { + thread_base *p = static_cast<thread_base *>(arg); + p->run(); + return 0; + } + private: + pthread_t thr; + bool need_join; + size_t stack_size; +}; + +struct hs_longrun_stat { + unsigned long long verify_error_count; + unsigned long long runtime_error_count; + unsigned long long unknown_count; + unsigned long long success_count; + hs_longrun_stat() + : verify_error_count(0), runtime_error_count(0), + unknown_count(0), success_count(0) { } + void add(const hs_longrun_stat& x) { + verify_error_count += x.verify_error_count; + runtime_error_count += x.runtime_error_count; + unknown_count += x.unknown_count; + success_count += x.success_count; + } +}; + +struct hs_longrun_thread_base : public thread_base { + struct arg_type { + int id; + std::string worker_type; + char op; + int lock_flag; + const hs_longrun_shared& sh; + arg_type(int id, const std::string& worker_type, char op, int lock_flag, + const hs_longrun_shared& sh) + : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag), + sh(sh) { } + }; + arg_type arg; + hs_longrun_stat stat; + drand48_data randbuf; + unsigned int seed; + hs_longrun_thread_base(const arg_type& arg) + : arg(arg), seed(0) { + seed = time(0) + arg.id + 1; + srand48_r(seed, &randbuf); + } + virtual ~hs_longrun_thread_base() { } + virtual void run() = 0; + size_t rand_record() { + double v = 0; + drand48_r(&randbuf, &v); + const size_t sz = arg.sh.records.size(); + size_t r = size_t(v * sz); + if (r >= sz) { + r = 0; + } + return r; + } + int verify_update(const std::string& k, const std::string& v1, + const std::string& v2, const std::string& v3, record_value& rec, + uint32_t num_rows, bool cur_unknown_state); + int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds, + const std::string rrec[4], record_value& rec); + int verify_readnolock(const std::string& k, uint32_t num_rows, + uint32_t num_flds, const std::string rrec[4]); +}; + +int +hs_longrun_thread_base::verify_update(const std::string& k, + const std::string& v1, const std::string& v2, const std::string& v3, + record_value& rec, uint32_t num_rows, bool cur_unknown_state) +{ + const bool op_success = num_rows == 1; + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_update_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_update_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } + if (op_success) { + rec.values.resize(4); + rec.values[0] = k; + rec.values[1] = v1; + rec.values[2] = v2; + rec.values[3] = v3; + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + } + rec.unknown_state = cur_unknown_state; + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(), + k.c_str(), v1.c_str(), v2.c_str(), v3.c_str()); + } + return ret; +} + +int +hs_longrun_thread_base::verify_read(const std::string& k, + uint32_t num_rows, uint32_t num_flds, const std::string rrec[4], + record_value& rec) +{ + const bool op_success = num_rows != 0; + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (num_flds != 4) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_fldnum %d\n", + arg.worker_type.c_str(), arg.id, k.c_str(), + static_cast<int>(num_flds)); + } + ret = 1; + } else if (rec.deleted) { + /* nothing to verify */ + } else { + int diff = 0; + for (size_t i = 0; i < 4; ++i) { + if (rec.values[i] == rrec[i]) { + /* ok */ + } else { + diff = 1; + } + } + if (diff) { + std::string mess; + for (size_t i = 0; i < 4; ++i) { + const std::string& expected = rec.values[i]; + const std::string& val = rrec[i]; + mess += " " + val + "/" + expected; + } + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_value %s\n", + arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str()); + } + ret = 1; + } + } + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str()); + } + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + return ret; +} + +int +hs_longrun_thread_base::verify_readnolock(const std::string& k, + uint32_t num_rows, uint32_t num_flds, const std::string rrec[4]) +{ + int ret = 0; + if (num_rows != 1 || num_flds != 4) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(), + k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(), + rrec[3].c_str()); + } + if (ret == 0) { + ++stat.success_count; + } + return ret; +} + +struct hs_longrun_thread_hs : public hs_longrun_thread_base { + hs_longrun_thread_hs(const arg_type& arg) + : hs_longrun_thread_base(arg) { } + void run(); + int check_hs_error(const char *mess, record_value *rec); + int op_insert(record_value& rec); + int op_delete(record_value& rec); + int op_update(record_value& rec); + int op_read(record_value& rec); + int op_readnolock(int k); + hstcpcli_ptr cli; + socket_args sockargs; +}; + +struct lock_guard : noncopyable { + lock_guard(mutex& mtx) : mtx(mtx) { + mtx.lock(); + } + ~lock_guard() { + mtx.unlock(); + } + mutex& mtx; +}; + +string_ref +to_string_ref(const std::string& s) +{ + return string_ref(s.data(), s.size()); +} + +std::string +to_string(const string_ref& s) +{ + return std::string(s.begin(), s.size()); +} + +void +hs_longrun_thread_hs::run() +{ + config c = arg.sh.conf; + if (arg.op == 'R' || arg.op == 'N') { + c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998)); + } else { + c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999)); + } + sockargs.set(c); + + while (arg.sh.running) { + if (cli.get() == 0 || !cli->stable_point()) { + cli = hstcpcli_i::create(sockargs); + if (check_hs_error("connect", 0) != 0) { + cli.reset(); + continue; + } + cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY", + "k,v1,v2,v3", "k,v1,v2,v3"); + cli->request_send(); + if (check_hs_error("openindex_send", 0) != 0) { + cli.reset(); + continue; + } + size_t num_flds = 0; + cli->response_recv(num_flds); + if (check_hs_error("openindex_recv", 0) != 0) { + cli.reset(); + continue; + } + cli->response_buf_remove(); + } + const size_t rec_id = rand_record(); + if (arg.lock_flag) { + record_value& rec = *arg.sh.records[rec_id]; + lock_guard g(rec.lock); + int e = 0; + switch (arg.op) { + case 'I': + e = op_insert(rec); + break; + case 'D': + e = op_delete(rec); + break; + case 'U': + e = op_update(rec); + break; + case 'R': + e = op_read(rec); + break; + default: + break; + } + } else { + int e = 0; + switch (arg.op) { + case 'N': + e = op_readnolock(rec_id); + break; + default: + break; + } + } + } +} + +int +hs_longrun_thread_hs::op_insert(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id); + const string_ref op_ref("+", 1); + const string_ref op_args[4] = { + to_string_ref(k), + to_string_ref(v1), + to_string_ref(v2), + to_string_ref(v3) + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_insert_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (arg.sh.verbose > 10) { + const string_ref *row = cli->get_next_row(); + fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(), + row ? to_string(row[0]).c_str() : ""); + } + const bool op_success = cli->get_error_code() == 0; + int ret = 0; + if (!rec.unknown_state) { + if (rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_insert_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (!rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_insert_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } else { + ++stat.unknown_count; + } + if (op_success) { + rec.values.resize(4); + rec.values[0] = k; + rec.values[1] = v1; + rec.values[2] = v2; + rec.values[3] = v3; + rec.deleted = false; + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(), + v2.c_str(), v3.c_str()); + } + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + rec.unknown_state = false; + } + cli->response_buf_remove(); + return ret; +} + +int +hs_longrun_thread_hs::op_delete(record_value& rec) +{ + const std::string k = rec.key; + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + const string_ref modop_ref("D", 1); + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + modop_ref, 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_delete_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + const bool op_success = (numflds > 0 && row != 0 && + to_string(row[0]) == "1"); + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_delete_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_delete_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } + cli->response_buf_remove(); + if (op_success) { + rec.deleted = true; + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + rec.unknown_state = false; + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "HS_DELETE %s\n", k.c_str()); + } + return ret; +} + +int +hs_longrun_thread_hs::op_update(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id); + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + const string_ref modop_ref("U", 1); + const string_ref modop_args[4] = { + to_string_ref(k), + to_string_ref(v1), + to_string_ref(v2), + to_string_ref(v3) + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + modop_ref, modop_args, 4, 0, 0); + cli->request_send(); + if (check_hs_error("op_update_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (check_hs_error("op_update_recv", &rec) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + uint32_t num_rows = row + ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0; + cli->response_buf_remove(); + const bool cur_unknown_state = (num_rows == 1); + return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state); +} + +int +hs_longrun_thread_hs::op_read(record_value& rec) +{ + const std::string k = rec.key; + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_read_send", 0) != 0) { return 1; } + size_t num_flds = 0; + size_t num_rows = 0; + cli->response_recv(num_flds); + if (check_hs_error("op_read_recv", 0) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + std::string rrec[4]; + if (row != 0 && num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = to_string(row[i]); + } + ++num_rows; + } + row = cli->get_next_row(); + if (row != 0) { + ++num_rows; + } + cli->response_buf_remove(); + return verify_read(k, num_rows, num_flds, rrec, rec); +} + +int +hs_longrun_thread_hs::op_readnolock(int key) +{ + const std::string k = to_stdstring(key); + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_read_send", 0) != 0) { return 1; } + size_t num_flds = 0; + size_t num_rows = 0; + cli->response_recv(num_flds); + if (check_hs_error("op_read_recv", 0) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + std::string rrec[4]; + if (row != 0 && num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = to_string(row[i]); + } + ++num_rows; + } + row = cli->get_next_row(); + if (row != 0) { + ++num_rows; + } + cli->response_buf_remove(); + return verify_readnolock(k, num_rows, num_flds, rrec); +} + +int +hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec) +{ + const int err = cli->get_error_code(); + if (err == 0) { + return 0; + } + ++stat.runtime_error_count; + if (arg.sh.verbose > 0) { + const std::string estr = cli->get_error(); + fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n", + arg.op, arg.id, mess, err, estr.c_str()); + } + if (rec) { + rec->unknown_state = true; + } + return 1; +} + +struct hs_longrun_thread_my : public hs_longrun_thread_base { + hs_longrun_thread_my(const arg_type& arg) + : hs_longrun_thread_base(arg), connected(false) { } + void run(); + void show_mysql_error(const char *mess, record_value *rec); + int op_insert(record_value& rec); + int op_delete(record_value& rec); + int op_update(record_value& rec); + int op_delins(record_value& rec); + int op_read(record_value& rec); + auto_mysql db; + bool connected; +}; + +void +hs_longrun_thread_my::run() +{ + const std::string mysql_host = arg.sh.conf.get_str("host", "localhost"); + const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = "hstestdb"; + + while (arg.sh.running) { + if (!connected) { + if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(), + mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) { + show_mysql_error("mysql_real_connect", 0); + continue; + } + } + connected = true; + const size_t rec_id = rand_record(); + record_value& rec = *arg.sh.records[rec_id]; + lock_guard g(rec.lock); + int e = 0; + switch (arg.op) { + #if 0 + case 'I': + e = op_insert(rec); + break; + case 'D': + e = op_delete(rec); + break; + case 'U': + e = op_update(rec); + break; + #endif + case 'T': + e = op_delins(rec); + break; + case 'R': + e = op_read(rec); + break; + default: + break; + } + } +} + +int +hs_longrun_thread_my::op_delins(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id); + int success = 0; + bool cur_unknown_state = false; + do { + char query[1024]; + #if 1 + if (mysql_query(db, "begin") != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin"); + } + break; + } + #endif + cur_unknown_state = true; + snprintf(query, 1024, + "delete from hstesttbl where k = '%s'", k.c_str()); + if (mysql_query(db, query) != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + } + break; + } + if (mysql_affected_rows(db) != 1) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: notfound: [%s]\n", query); + } + break; + } + snprintf(query, 1024, + "insert into hstesttbl values ('%s', '%s', '%s', '%s')", + k.c_str(), v1.c_str(), v2.c_str(), v3.c_str()); + if (mysql_query(db, query) != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + } + break; + } + #if 1 + if (mysql_query(db, "commit") != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit"); + } + break; + } + #endif + success = true; + cur_unknown_state = false; + } while (false); + return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state); +} + +int +hs_longrun_thread_my::op_read(record_value& rec) +{ + const std::string k = rec.key; + char query[1024] = { 0 }; + const int len = snprintf(query, 1024, + "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str()); + const int r = mysql_real_query(db, query, len > 0 ? len : 0); + if (r != 0) { + show_mysql_error(query, 0); + return 1; + } + MYSQL_ROW row = 0; + unsigned long *lengths = 0; + unsigned int num_rows = 0; + unsigned int num_flds = 0; + auto_mysql_res res(db); + std::string rrec[4]; + if (res != 0) { + num_flds = mysql_num_fields(res); + row = mysql_fetch_row(res); + if (row != 0) { + lengths = mysql_fetch_lengths(res); + if (num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = std::string(row[i], lengths[i]); + } + } + ++num_rows; + row = mysql_fetch_row(res); + if (row != 0) { + ++num_rows; + } + } + } + return verify_read(k, num_rows, num_flds, rrec, rec); +} + +void +hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec) +{ + ++stat.runtime_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n", + arg.op, arg.id, mess, mysql_error(db)); + } + if (rec) { + rec->unknown_state = true; + } + db.reset(); + connected = false; +} + +void +mysql_do(MYSQL *db, const char *query) +{ + if (mysql_real_query(db, query, strlen(query)) != 0) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + fatal_exit("mysql_do"); + } +} + +void +hs_longrun_init_table(const config& conf, int num_prepare, + hs_longrun_shared& shared) +{ + const std::string mysql_host = conf.get_str("host", "localhost"); + const std::string mysql_user = conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = ""; + auto_mysql db; + if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(), + mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) { + fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db)); + fatal_exit("hs_longrun_init_table"); + } + mysql_do(db, "drop database if exists hstestdb"); + mysql_do(db, "create database hstestdb"); + mysql_do(db, "use hstestdb"); + mysql_do(db, + "create table hstesttbl (" + "k int primary key," + "v1 varchar(32) not null," + "v2 varchar(32) not null," + "v3 varchar(32) not null" + ") character set utf8 collate utf8_bin engine = innodb"); + for (int i = 0; i < num_prepare; ++i) { + const std::string i_str = to_stdstring(i); + const std::string v1 = "pv1_" + i_str; + const std::string v2 = "pv2_" + i_str; + const std::string v3 = "pv3_" + i_str; + char buf[1024]; + snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values" + "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str()); + mysql_do(db, buf); + record_value *rec = shared.records[i]; + rec->key = i_str; + rec->values.resize(4); + rec->values[0] = i_str; + rec->values[1] = v1; + rec->values[2] = v2; + rec->values[3] = v3; + rec->deleted = false; + } +} + +int +hs_longrun_main(int argc, char **argv) +{ + hs_longrun_shared shared; + parse_args(argc, argv, shared.conf); + shared.conf["host"] = shared.conf.get_str("host", "localhost"); + shared.verbose = shared.conf.get_int("verbose", 1); + const int table_size = shared.conf.get_int("table_size", 10000); + for (int i = 0; i < table_size; ++i) { + std::auto_ptr<record_value> rec(new record_value()); + rec->key = to_stdstring(i); + shared.records.push_back_ptr(rec); + } + mysql_library_init(0, 0, 0); + const int duration = shared.conf.get_int("duration", 10); + const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10); + const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10); + const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10); + const int num_hsread = shared.conf.get_int("num_hsread", 10); + const int num_myread = shared.conf.get_int("num_myread", 10); + const int num_mydelins = shared.conf.get_int("num_mydelins", 10); + int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10); + const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0); + if (!always_filled) { + num_hsreadnolock = 0; + } + hs_longrun_init_table(shared.conf, always_filled ? table_size : 0, + shared); + /* create worker threads */ + static const struct thrtmpl_type { + const char *type; char op; int num; int hs; int lock; + } thrtmpl[] = { + { "hsinsert", 'I', num_hsinsert, 1, 1 }, + { "hsdelete", 'D', num_hsdelete, 1, 1 }, + { "hsupdate", 'U', num_hsupdate, 1, 1 }, + { "hsread", 'R', num_hsread, 1, 1 }, + { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 }, + { "myread", 'R', num_myread, 0, 1 }, + { "mydelins", 'T', num_mydelins, 0, 1 }, + }; + typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type; + thrs_type thrs; + for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) { + const thrtmpl_type& e = thrtmpl[i]; + for (int j = 0; j < e.num; ++j) { + int id = thrs.size(); + const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock, + shared); + std::auto_ptr<hs_longrun_thread_base> thr; + if (e.hs) { + thr.reset(new hs_longrun_thread_hs(arg)); + } else { + thr.reset(new hs_longrun_thread_my(arg)); + } + thrs.push_back_ptr(thr); + } + } + shared.num_threads = thrs.size(); + /* start threads */ + fprintf(stderr, "START\n"); + shared.running = 1; + for (size_t i = 0; i < thrs.size(); ++i) { + thrs[i]->start(); + } + /* wait */ + sleep(duration); + /* stop thread */ + shared.running = 0; + for (size_t i = 0; i < thrs.size(); ++i) { + thrs[i]->join(); + } + fprintf(stderr, "DONE\n"); + /* summary */ + typedef std::map<std::string, hs_longrun_stat> stat_map; + stat_map sm; + for (size_t i = 0; i < thrs.size(); ++i) { + hs_longrun_thread_base *const thr = thrs[i]; + const std::string wt = thr->arg.worker_type; + hs_longrun_stat& v = sm[wt]; + v.add(thr->stat); + } + hs_longrun_stat total; + for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) { + if (i->second.verify_error_count != 0) { + fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(), + i->second.verify_error_count); + } + if (i->second.runtime_error_count) { + fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(), + i->second.runtime_error_count); + } + if (i->second.unknown_count) { + fprintf(stderr, "%s unknown %llu\n", i->first.c_str(), + i->second.unknown_count); + } + fprintf(stderr, "%s success %llu\n", i->first.c_str(), + i->second.success_count); + total.add(i->second); + } + if (total.verify_error_count != 0) { + fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count); + } + if (total.runtime_error_count != 0) { + fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count); + } + if (total.unknown_count != 0) { + fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count); + } + fprintf(stderr, "TOTAL success %llu\n", total.success_count); + mysql_library_end(); + return 0; +} + +}; + +int +main(int argc, char **argv) +{ + return dena::hs_longrun_main(argc, argv); +} + diff --git a/plugin/handler_socket/client/hstest.cpp b/plugin/handler_socket/client/hstest.cpp index c766488cbdf..b5551fed81c 100644 --- a/plugin/handler_socket/client/hstest.cpp +++ b/plugin/handler_socket/client/hstest.cpp @@ -92,7 +92,7 @@ namespace { double gettimeofday_double() { - struct timeval tv = { }; + struct timeval tv; if (gettimeofday(&tv, 0) != 0) { fatal_abort("gettimeofday"); } @@ -552,7 +552,8 @@ hstest_thread::test_9(int test_num) "moreflds_prefix", "column0123456789_"); const int use_handler = arg.sh.conf.get_int("handler", 0); const int sched_flag = arg.sh.conf.get_int("sched", 0); - const int ssps = arg.sh.conf.get_int("ssps", 0); + const int use_in = arg.sh.conf.get_int("in", 0); + const int ssps = use_in ? 0 : arg.sh.conf.get_int("ssps", 0); std::string flds = "v"; for (int i = 0; i < moreflds; ++i) { char buf[1024]; @@ -561,6 +562,7 @@ hstest_thread::test_9(int test_num) } int connected = 0; std::auto_ptr<auto_mysql_stmt> stmt; + string_buffer wbuf; for (int i = 0; i < num; ++i) { const double tm1 = gettimeofday_double(); const int flags = 0; @@ -623,6 +625,20 @@ hstest_thread::test_9(int test_num) // TODO: moreflds } else if (ssps) { // + } else if (use_in) { + wbuf.clear(); + char *p = wbuf.make_space(1024); + int len = snprintf(p, 1024, "select %s from hstest_table1 where k in ('%d'", flds.c_str(), k); + wbuf.space_wrote(len); + for (int j = 1; j < use_in; ++j) { + /* generate more key */ + drand48_r(&randbuf, &kf); + k = int(kf * tablesize); + p = wbuf.make_space(1024); + int len = snprintf(p, 1024, ", '%d'", k); + wbuf.space_wrote(len); + } + wbuf.append_literal(")"); } else { buf_query_len = snprintf(buf_query, sizeof(buf_query), "select %s from hstest_table1 where k = '%d'", flds.c_str(), k); @@ -651,6 +667,8 @@ hstest_thread::test_9(int test_num) } r = mysql_stmt_execute(*stmt); // fprintf(stderr, "stmt exec\n"); + } else if (use_in) { + r = mysql_real_query(db, wbuf.begin(), wbuf.size()); } else { r = mysql_real_query(db, buf_query, buf_query_len); // fprintf(stderr, "real query\n"); @@ -694,7 +712,7 @@ hstest_thread::test_9(int test_num) num_flds = mysql_num_fields(res); MYSQL_ROW row = 0; while ((row = mysql_fetch_row(res)) != 0) { - got_data = 1; + got_data += 1; unsigned long *const lengths = mysql_fetch_lengths(res); if (verbose >= 2) { for (unsigned int i = 0; i < num_flds; ++i) { @@ -706,7 +724,10 @@ hstest_thread::test_9(int test_num) } } } else { - got_data = 1; + MYSQL_ROW row = 0; + while ((row = mysql_fetch_row(res)) != 0) { + got_data += 1; + } } } else { if (mysql_field_count(db) == 0) { @@ -730,7 +751,7 @@ hstest_thread::test_9(int test_num) if (err == 0) { ++io_success_count; if (num_affected_rows > 0 || got_data > 0) { - ++op_success_count; + op_success_count += got_data; } else { if (verbose >= 1) { fprintf(stderr, "k=%d numaff=%u gotdata=%d\n", @@ -779,6 +800,7 @@ hstest_thread::test_10(int test_num) const std::string table = arg.sh.conf.get_str("table", "hstest_table1"); const std::string index = arg.sh.conf.get_str("index", "PRIMARY"); const std::string field = arg.sh.conf.get_str("field", "v"); + const int use_in = arg.sh.conf.get_int("in", 0); const std::string moreflds_prefix = arg.sh.conf.get_str( "moreflds_prefix", "column0123456789_"); const int dump = arg.sh.dump; @@ -789,8 +811,8 @@ hstest_thread::test_10(int test_num) snprintf(sbuf, sizeof(sbuf), ",%s%d", moreflds_prefix.c_str(), i); moreflds_str += std::string(sbuf); } - char wbuf[16384], rbuf[16384]; - int wbuflen = 0; + string_buffer wbuf; + char rbuf[16384]; for (size_t i = 0; i < arg.sh.loop; ++i) { int len = 0, rlen = 0, wlen = 0; #if 0 @@ -801,15 +823,18 @@ hstest_thread::test_10(int test_num) fprintf(stderr, "connect: %d %s\n", errno, strerror(errno)); return; } - len = snprintf(wbuf, sizeof(wbuf), + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "P\t1\t%s\t%s\tPRIMARY\t%s%s\n", dbname.c_str(), table.c_str(), field.c_str(), moreflds_str.c_str()); /* pst_num, db, table, index, retflds */ - wlen = write(fd.get(), wbuf, len); + wbuf.space_wrote(len); + wlen = write(fd.get(), wbuf.begin(), len); if (len != wlen) { fprintf(stderr, "write: %d %d\n", len, wlen); return; } + wbuf.clear(); rlen = read(fd.get(), rbuf, sizeof(rbuf)); if (rlen <= 0 || rbuf[rlen - 1] != '\n') { fprintf(stderr, "read: rlen=%d errno=%d\n", rlen, errno); @@ -824,7 +849,6 @@ hstest_thread::test_10(int test_num) const double tm1 = gettimeofday_double(); for (size_t j = 0; j < arg.sh.pipe; ++j) { int k = 0, v = 0; - wbuflen = 0; { while (true) { double kf = 0, vf = 0; @@ -832,19 +856,6 @@ hstest_thread::test_10(int test_num) drand48_r(&randbuf, &vf); k = int(kf * tablesize) + firstkey; v = int(vf * tablesize) + firstkey; - // k = rand_r(&seed); - // v = rand_r(&seed); /* unused */ - #if 0 - if (tablesize != 0) { - k &= tablesize; - } - #endif - if (op == 'G') { - wbuflen = snprintf(wbuf, sizeof(wbuf), "1\t=\t1\t%d\n", k); - } else if (op == 'U') { - wbuflen = snprintf(wbuf, sizeof(wbuf), - "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str()); - } if (k - firstkey < arg.sh.keygen_size) { volatile char *const ptr = arg.sh.keygen + (k - firstkey); // int oldv = __sync_fetch_and_or(ptr, 1); @@ -864,15 +875,42 @@ hstest_thread::test_10(int test_num) continue; } } + size_t len = 0; + if (op == 'G') { + if (use_in) { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "1\t=\t1\t\t%d\t0\t@\t0\t%d\t%d", + use_in, use_in, k); + wbuf.space_wrote(len); + for (int j = 1; j < use_in; ++j) { + drand48_r(&randbuf, &kf); + k = int(kf * tablesize) + firstkey; + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "\t%d", k); + wbuf.space_wrote(len); + } + wbuf.append_literal("\n"); + } else { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "1\t=\t1\t%d\n", k); + wbuf.space_wrote(len); + } + } else if (op == 'U') { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, + "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str()); + wbuf.space_wrote(len); + } break; } } - wlen = write(fd.get(), wbuf, wbuflen); - if (wlen != wbuflen) { - fprintf(stderr, "write: %d %d\n", wbuflen, wlen); - return; - } } + wlen = write(fd.get(), wbuf.begin(), wbuf.size()); + if ((size_t) wlen != wbuf.size()) { + fprintf(stderr, "write: %d %d\n", (int)wbuf.size(), wlen); + return; + } + wbuf.clear(); size_t read_cnt = 0; size_t read_pos = 0; while (read_cnt < arg.sh.pipe) { @@ -922,7 +960,7 @@ hstest_thread::test_10(int test_num) ++op_success_count; arg.sh.increment_count(); if (arg.sh.dump && arg.sh.pipe == 1) { - fwrite(wbuf, wbuflen, 1, stderr); + fwrite(wbuf.begin(), wbuf.size(), 1, stderr); } } } |