summaryrefslogtreecommitdiff
path: root/storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc')
-rw-r--r--storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc508
1 files changed, 508 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc b/storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc
new file mode 100644
index 00000000000..b2ecb25350e
--- /dev/null
+++ b/storage/tokudb/PerconaFT/src/tests/loader-tpch-load.cc
@@ -0,0 +1,508 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "test.h"
+#include "toku_pthread.h"
+#include <db.h>
+#include <sys/stat.h>
+
+DB_ENV *env;
+enum {MAX_NAME=128};
+enum {MAX_DBS=16};
+enum {MAX_ROW_LEN=1024};
+static int NUM_DBS=10;
+static int DISALLOW_PUTS=0;
+static int COMPRESS=0;
+static int USE_REGION=0;
+static const char *envdir = TOKU_TEST_FILENAME;
+
+static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
+static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) __attribute__((unused));
+
+// linenumber,orderkey form a unique, primary key
+// key is a potentially duplicate secondary key
+struct tpch_key {
+ uint32_t linenumber;
+ uint32_t orderkey;
+ uint32_t key;
+};
+
+static __attribute__((__unused__)) int
+tpch_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
+ assert(db && a && b);
+ assert(a->size == sizeof(struct tpch_key));
+ assert(b->size == sizeof(struct tpch_key));
+
+ unsigned int xl = (*((struct tpch_key *) a->data)).linenumber;
+ unsigned int xo = (*((struct tpch_key *) a->data)).orderkey;
+ unsigned int xk = (*((struct tpch_key *) a->data)).key;
+
+ unsigned int yl = (*((struct tpch_key *) b->data)).linenumber;
+ unsigned int yo = (*((struct tpch_key *) b->data)).orderkey;
+ unsigned int yk = (*((struct tpch_key *) b->data)).key;
+
+// printf("tpch_dbt_cmp xl:%d, yl:%d, xo:%d, yo:%d, xk:%d, yk:%d\n", xl, yl, xo, yo, xk, yk);
+
+ if (xk<yk) return -1;
+ if (xk>yk) return 1;
+
+ if (xl<yl) return -1;
+ if (xl>yl) return 1;
+
+ if (xo>yo) return -1;
+ if (xo<yo) return 1;
+ return 0;
+}
+
+
+static int lineno = 0;
+static char *tpch_read_row(FILE *fp, int *key, char *val)
+{
+ *key = lineno++;
+ return fgets(val, MAX_ROW_LEN , fp);
+}
+
+
+/*
+ * split '|' separated fields into fields array
+ */
+static void tpch_parse_row(char *row, char *fields[], int fields_N)
+{
+ int field = 0;
+ int i = 0;
+ int p = 0;
+ char c = row[p];
+
+ while(c != '\0')
+ {
+ if ( c == '|') {
+ fields[field][i] = '\0';
+ //printf("field : <%s>\n", fields[field]);
+ field++;
+ i = 0;
+ }
+ else
+ fields[field][i++] = c;
+ c = row[++p];
+ }
+ assert(field == fields_N);
+}
+
+/*
+ * region table
+ */
+
+static int generate_rows_for_region(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
+{
+ toku_dbt_array_resize(dest_keys, 1);
+ toku_dbt_array_resize(dest_vals, 1);
+ DBT *dest_key = &dest_keys->dbts[0];
+ DBT *dest_val = &dest_vals->dbts[0];
+
+ // not used
+ (void) src_db;
+ (void) src_key;
+ assert(*(uint32_t*)dest_db->app_private == 0);
+
+ // region fields
+ char regionkey[8];
+ char name[32];
+ char comment[160];
+ char row[8+32+160+8];
+ sprintf(row, "%s", (char*)src_val->data);
+
+ const uint32_t fields_N = 3;
+ char *fields[3] = {regionkey, name, comment};
+ tpch_parse_row(row, fields, fields_N);
+
+ if (dest_key->flags==DB_DBT_REALLOC) {
+ if (dest_key->data) toku_free(dest_key->data);
+ dest_key->flags = 0;
+ dest_key->ulen = 0;
+ }
+ if (dest_val->flags==DB_DBT_REALLOC) {
+ if (dest_val->data) toku_free(dest_val->data);
+ dest_val->flags = 0;
+ dest_val->ulen = 0;
+ }
+
+ struct tpch_key *XMALLOC(key);
+ key->orderkey = atoi(regionkey);
+ key->linenumber = atoi(regionkey);
+ key->key = atoi(regionkey);
+
+ char *XMALLOC_N(sizeof(row), val);
+ sprintf(val, "%s|%s", name, comment);
+
+ dbt_init(dest_key, key, sizeof(struct tpch_key));
+ dest_key->flags = DB_DBT_REALLOC;
+
+ dbt_init(dest_val, val, strlen(val)+1);
+ dest_val->flags = DB_DBT_REALLOC;
+
+ return 0;
+}
+
+/*
+ * lineitem table
+ */
+
+
+static int generate_rows_for_lineitem(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val)
+{
+ toku_dbt_array_resize(dest_keys, 1);
+ toku_dbt_array_resize(dest_vals, 1);
+ DBT *dest_key = &dest_keys->dbts[0];
+ DBT *dest_val = &dest_vals->dbts[0];
+ // not used
+ (void) src_db;
+ (void) src_key;
+
+ // lineitem fields
+ char orderkey[16];
+ char partkey[16];
+ char suppkey[16];
+ char linenumber[8];
+ char quantity[8];
+ char extendedprice[16];
+ char discount[8];
+ char tax[8];
+ char returnflag[8];
+ char linestatus[8];
+ char shipdate[16];
+ char commitdate[16];
+ char receiptdate[16];
+ char shipinstruct[32];
+ char shipmode[16];
+ char comment[48];
+ char row[16+16+16+8+8+16+8+8+8+8+16+16+16+32+16+48 + 8];
+ sprintf(row, "%s", (char*)src_val->data);
+
+ const uint32_t fields_N = 16;
+ char *fields[16] = {orderkey,
+ partkey,
+ suppkey,
+ linenumber,
+ quantity,
+ extendedprice,
+ discount,
+ tax,
+ returnflag,
+ linestatus,
+ shipdate,
+ commitdate,
+ receiptdate,
+ shipinstruct,
+ shipmode,
+ comment};
+ tpch_parse_row(row, fields, fields_N);
+
+ if (dest_key->flags==DB_DBT_REALLOC) {
+ if (dest_key->data) toku_free(dest_key->data);
+ dest_key->flags = 0;
+ dest_key->ulen = 0;
+ }
+ if (dest_val->flags==DB_DBT_REALLOC) {
+ if (dest_val->data) toku_free(dest_val->data);
+ dest_val->flags = 0;
+ dest_val->ulen = 0;
+ }
+
+ struct tpch_key *XMALLOC(key);
+ key->orderkey = atoi(linenumber);
+ key->linenumber = atoi(orderkey);
+
+ char *val;
+ uint32_t which = *(uint32_t*)dest_db->app_private;
+
+ if ( which == 0 ) {
+ val = toku_xstrdup(row);
+ }
+ else {
+ val = toku_xstrdup(orderkey);
+ }
+
+ switch(which) {
+ case 0:
+ key->key = atoi(linenumber);
+ break;
+ case 1:
+ // lineitem_fk1
+ key->key = atoi(orderkey);
+ break;
+ case 2:
+ // lineitem_fk2
+ key->key = atoi(suppkey);
+ break;
+ case 3:
+ // lineitem_fk3
+ key->key = atoi(partkey);// not really, ...
+ break;
+ case 4:
+ // lineitem_fk4
+ key->key = atoi(partkey);
+ break;
+ case 5:
+ // li_shp_dt_idx
+ key->key = atoi(linenumber) + atoi(suppkey); // not really ...
+ break;
+ case 6:
+ key->key = atoi(linenumber) +atoi(partkey); // not really ...
+ break;
+ case 7:
+ // li_rcpt_dt_idx
+ key->key = atoi(suppkey) + atoi(partkey); // not really ...
+ break;
+ default:
+ assert(0);
+ }
+
+ dbt_init(dest_key, key, sizeof(struct tpch_key));
+ dest_key->flags = DB_DBT_REALLOC;
+
+ dbt_init(dest_val, val, strlen(val)+1);
+ dest_val->flags = DB_DBT_REALLOC;
+
+ return 0;
+}
+
+
+static void *expect_poll_void = &expect_poll_void;
+static int poll_count=0;
+static int poll_function (void *extra, float progress) {
+ if (0) {
+ static int did_one=0;
+ static struct timeval start;
+ struct timeval now;
+ gettimeofday(&now, 0);
+ if (!did_one) {
+ start=now;
+ did_one=1;
+ }
+ printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
+ }
+ assert(extra==expect_poll_void);
+ assert(0.0<=progress && progress<=1.0);
+ poll_count++;
+ return 0;
+}
+
+static int test_loader(DB **dbs)
+{
+ int r;
+ DB_TXN *txn;
+ DB_LOADER *loader;
+ uint32_t db_flags[MAX_DBS];
+ uint32_t dbt_flags[MAX_DBS];
+ for(int i=0;i<MAX_DBS;i++) {
+ db_flags[i] = DB_NOOVERWRITE;
+ dbt_flags[i] = 0;
+ }
+ uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
+
+ FILE *fp;
+ // select which table to loader
+ if ( USE_REGION ) {
+ fp = fopen("./region.tbl", "r");
+ if (fp == NULL) {
+ fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
+ return 1;
+ }
+ assert(fp != NULL);
+ } else {
+ fp = fopen("./lineitem.tbl", "r");
+ if (fp == NULL) {
+ fprintf(stderr, "%s:%d %s\n", __FUNCTION__, __LINE__, strerror(errno));
+ return 1;
+ }
+ assert(fp != NULL);
+ }
+
+ // create and initialize loader
+
+ r = env->txn_begin(env, NULL, &txn, 0);
+ CKERR(r);
+ r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
+ CKERR(r);
+ r = loader->set_error_callback(loader, NULL, NULL);
+ CKERR(r);
+ r = loader->set_poll_function(loader, poll_function, expect_poll_void);
+ CKERR(r);
+
+ // using loader->put, put values into DB
+ printf("puts "); fflush(stdout);
+ DBT key, val;
+ int k;
+ char v[MAX_ROW_LEN];
+ char *c;
+ c = tpch_read_row(fp, &k, v);
+ int i = 1;
+ while ( c != NULL ) {
+ v[strlen(v)-1] = '\0'; // remove trailing \n
+ dbt_init(&key, &k, sizeof(int));
+ dbt_init(&val, v, strlen(v)+1);
+ r = loader->put(loader, &key, &val);
+ if (DISALLOW_PUTS) {
+ CKERR2(r, EINVAL);
+ } else {
+ CKERR(r);
+ }
+ if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
+ c = tpch_read_row(fp, &k, v);
+ }
+ if(verbose) {printf("\n"); fflush(stdout);}
+ fclose(fp);
+
+ poll_count=0;
+
+ // close the loader
+ printf("closing"); fflush(stdout);
+ r = loader->close(loader);
+ printf(" done\n");
+ CKERR(r);
+
+ if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
+
+ r = txn->commit(txn, 0);
+ CKERR(r);
+
+ return 0;
+}
+
+static int run_test(void)
+{
+ int r;
+ char rmcmd[32 + strlen(envdir)];
+ snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
+ r = system(rmcmd); CKERR(r);
+ r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
+
+ r = db_env_create(&env, 0); CKERR(r);
+ db_env_enable_engine_status(0); // disable engine status on crash because test is expected to fail
+ r = env->set_default_bt_compare(env, tpch_dbt_cmp); CKERR(r);
+ // select which TPC-H table to load
+ if ( USE_REGION ) {
+ r = env->set_generate_row_callback_for_put(env, generate_rows_for_region); CKERR(r);
+ NUM_DBS=1;
+ }
+ else {
+ r = env->set_generate_row_callback_for_put(env, generate_rows_for_lineitem); CKERR(r);
+ NUM_DBS=8;
+ }
+
+ int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
+ r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
+ env->set_errfile(env, stderr);
+ //Disable auto-checkpointing
+ r = env->checkpointing_set_period(env, 0); CKERR(r);
+
+ DBT desc;
+ dbt_init(&desc, "foo", sizeof("foo"));
+ char name[MAX_NAME*2];
+
+ DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
+ assert(dbs != NULL);
+ int idx[MAX_DBS];
+ for(int i=0;i<NUM_DBS;i++) {
+ idx[i] = i;
+ r = db_create(&dbs[i], env, 0); CKERR(r);
+ dbs[i]->app_private = &idx[i];
+ snprintf(name, sizeof(name), "db_%04x", i);
+ r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
+ IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
+ { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
+ });
+ }
+
+ // -------------------------- //
+ int testr = test_loader(dbs);
+ // -------------------------- //
+
+ for(int i=0;i<NUM_DBS;i++) {
+ dbs[i]->close(dbs[i], 0); CKERR(r);
+ dbs[i] = NULL;
+ }
+ r = env->close(env, 0); CKERR(r);
+ toku_free(dbs);
+
+ return testr;
+}
+
+// ------------ infrastructure ----------
+static void do_args(int argc, char * const argv[]);
+
+int test_main(int argc, char * const *argv) {
+ do_args(argc, argv);
+ int r = run_test();
+ return r;
+}
+
+static void do_args(int argc, char * const argv[]) {
+ int resultcode;
+ char *cmd = argv[0];
+ argc--; argv++;
+ while (argc>0) {
+ if (strcmp(argv[0], "-v")==0) {
+ verbose++;
+ } else if (strcmp(argv[0],"-q")==0) {
+ verbose--;
+ if (verbose<0) verbose=0;
+ } else if (strcmp(argv[0], "-h")==0) {
+ resultcode=0;
+ do_usage:
+ fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
+ exit(resultcode);
+ } else if (strcmp(argv[0], "-p")==0) {
+ DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
+ } else if (strcmp(argv[0], "-z")==0) {
+ COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
+ } else if (strcmp(argv[0], "-g")==0) {
+ USE_REGION = 1;
+ } else if (strcmp(argv[0], "-e") == 0) {
+ argc--; argv++;
+ if (argc > 0)
+ envdir = argv[0];
+ } else {
+ fprintf(stderr, "Unknown arg: %s\n", argv[0]);
+ resultcode=1;
+ goto do_usage;
+ }
+ argc--;
+ argv++;
+ }
+}