summaryrefslogtreecommitdiff
path: root/storage/innobase/srv/srv0start.cc
diff options
context:
space:
mode:
authorJan Lindström <jplindst@mariadb.org>2013-12-19 14:36:38 +0200
committerJan Lindström <jplindst@mariadb.org>2013-12-19 14:36:38 +0200
commit5e55d1ced52c52fb2f0508e1346059901a85960f (patch)
tree517032a404d9b0ebde9b9174459fbce301dcec95 /storage/innobase/srv/srv0start.cc
parent1f4f425a2007c51eeee35f911a787fc7d82d977c (diff)
downloadmariadb-git-5e55d1ced52c52fb2f0508e1346059901a85960f.tar.gz
Changes for Fusion-io multi-threaded flush, page compressed tables and
tables using atomic write/table. This is work in progress and some parts are at most POC quality.
Diffstat (limited to 'storage/innobase/srv/srv0start.cc')
-rw-r--r--storage/innobase/srv/srv0start.cc720
1 files changed, 710 insertions, 10 deletions
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index efe9f094c0d..0517f4b1468 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -3,6 +3,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, Google Inc.
Copyright (c) 2009, Percona Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -126,7 +127,10 @@ static os_file_t files[1000];
/** io_handler_thread parameters for thread identification */
static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */
-static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32];
+/** pgcomp_thread are 16 total */
+#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
+#define PGCOMP_MAX_WORKER 16
+static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + PGCOMP_MAX_WORKER];
/** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */
@@ -522,7 +526,7 @@ create_log_file(
*file = os_file_create(
innodb_file_log_key, name,
- OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret);
+ OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret, FALSE);
ib_logf(IB_LOG_LEVEL_INFO,
"Setting log file %s size to %lu MB",
@@ -715,7 +719,7 @@ open_log_file(
*file = os_file_create(innodb_file_log_key, name,
OS_FILE_OPEN, OS_FILE_AIO,
- OS_LOG_FILE, &ret);
+ OS_LOG_FILE, &ret, FALSE);
if (!ret) {
ib_logf(IB_LOG_LEVEL_ERROR, "Unable to open '%s'", name);
return(DB_ERROR);
@@ -806,7 +810,7 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key, name, OS_FILE_CREATE,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (srv_read_only_mode) {
@@ -849,7 +853,7 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key, name, OS_FILE_OPEN_RAW,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (!ret) {
ib_logf(IB_LOG_LEVEL_ERROR,
@@ -881,17 +885,17 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN_RAW,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
} else if (i == 0) {
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN_RETRY,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
} else {
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN, OS_FILE_NORMAL,
- OS_DATA_FILE, &ret);
+ OS_DATA_FILE, &ret, FALSE);
}
if (!ret) {
@@ -1078,7 +1082,7 @@ srv_undo_tablespace_create(
innodb_file_data_key,
name,
srv_read_only_mode ? OS_FILE_OPEN : OS_FILE_CREATE,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (srv_read_only_mode && ret) {
ib_logf(IB_LOG_LEVEL_INFO,
@@ -1159,7 +1163,8 @@ srv_undo_tablespace_open(
| OS_FILE_ON_ERROR_SILENT,
OS_FILE_NORMAL,
OS_DATA_FILE,
- &ret);
+ &ret,
+ FALSE);
/* If the file open was successful then load the tablespace. */
@@ -1430,6 +1435,691 @@ srv_start_wait_for_purge_to_start()
}
}
+/* JAN: TODO: */
+/**********************************************************************************/
+extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time);
+extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type);
+extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type);
+extern void buf_flush_common(enum buf_flush flush_type, ulint page_count);
+extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit);
+
+typedef enum wrk_status {
+ WRK_ITEM_SET=0,
+ WRK_ITEM_START=1,
+ WRK_ITEM_DONE=2,
+ WRK_ITEM_SUCCESS=2,
+ WRK_ITEM_FAILED=3,
+ WRK_ITEM_STATUS_UNDEFINED
+} wrk_status_t;
+
+typedef enum wthr_status {
+ WTHR_NOT_INIT=0,
+ WTHR_INITIALIZED=1,
+ WTHR_SIG_WAITING=2,
+ WTHR_RUNNING=3,
+ WTHR_NO_WORK=4,
+ WTHR_KILL_IT=5,
+ WTHR_STATUS_UNDEFINED
+} wthr_status_t;
+
+typedef struct wrk_itm
+{
+ /****************************/
+ /* Need to group into struct*/
+ buf_pool_t* buf_pool; //buffer-pool instance
+ int flush_type; //flush-type for buffer-pool flush operation
+ int min; //minimum number of pages requested to be flushed
+ unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation
+ /****************************/
+
+ unsigned long result; //flush pages count
+ unsigned long t_usec; //time-taken in usec
+ long id_usr; //thread-id currently working
+ wrk_status_t wi_status; //flag
+ struct wrk_itm *next;
+} wrk_t;
+
+typedef enum op_q_status {
+ Q_NOT_INIT=0,
+ Q_EMPTY=1,
+ Q_INITIALIZED=2,
+ Q_PROCESS=3,
+ Q_DONE=4,
+ Q_ERROR=5,
+ Q_STATUS_UNDEFINED
+} q_status_t;
+
+typedef struct op_queue
+{
+ pthread_mutex_t mtx;
+ pthread_cond_t cv;
+ q_status_t flag;
+ wrk_t *head;
+ wrk_t *tail;
+} opq_t;
+
+opq_t wq, cq;
+
+typedef struct thread_sync
+{
+ int wthread_id;
+ pthread_t wthread;
+ opq_t *wq;
+ opq_t *cq;
+ wthr_status_t wt_status;
+ unsigned long stat_universal_num_processed;
+ unsigned long stat_cycle_num_processed;
+} thread_sync_t;
+
+/* Global XXX:DD needs to be cleaned */
+int exit_flag;
+ulint check_wrk_done_count;
+static ulint done_cnt_flag;
+static int pgc_n_threads = 8;
+
+thread_sync_t pc_sync[PGCOMP_MAX_WORKER];
+static wrk_t work_items[PGCOMP_MAX_WORKER];
+static int pgcomp_wrk_initialized = -1;
+
+int set_check_done_flag_count(int cnt)
+{
+ return(check_wrk_done_count = cnt);
+}
+
+int set_pgcomp_wrk_init_done(void)
+{
+ pgcomp_wrk_initialized = 1;
+ return 0;
+}
+
+int is_pgcomp_wrk_init_done(void)
+{
+ return(pgcomp_wrk_initialized == 1);
+}
+
+ulint set_done_cnt_flag(ulint val)
+{
+ /*
+ * Assumption: The thread calling into set_done_cnt_flag
+ * needs to have "cq.mtx" acquired, else not safe.
+ */
+ done_cnt_flag = val;
+ return done_cnt_flag;
+}
+
+
+ulint cv_done_inc_flag_sig(thread_sync_t * ppc)
+{
+ pthread_mutex_lock(&ppc->cq->mtx);
+ ppc->stat_universal_num_processed++;
+ ppc->stat_cycle_num_processed++;
+ done_cnt_flag++;
+ if(!(done_cnt_flag <= check_wrk_done_count)) {
+ fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n",
+ done_cnt_flag, check_wrk_done_count);
+ }
+ assert(done_cnt_flag <= check_wrk_done_count);
+ pthread_mutex_unlock(&ppc->cq->mtx);
+ if(done_cnt_flag == check_wrk_done_count) {
+ ppc->wq->flag = Q_DONE;
+ pthread_mutex_lock(&ppc->cq->mtx);
+ ppc->cq->flag = Q_DONE;
+ pthread_cond_signal(&ppc->cq->cv);
+ pthread_mutex_unlock(&ppc->cq->mtx);
+ }
+ return(done_cnt_flag);
+}
+
+int q_remove_wrk(opq_t *q, wrk_t **wi)
+{
+ int ret = 0;
+
+ if(!wi || !q) {
+ return -1;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* get the first in the list*/
+ *wi = q->head;
+ if(q->head) {
+ ret = 0;
+ q->head = q->head->next;
+ (*wi)->next = NULL;
+ if(!q->head) {
+ q->tail = NULL;
+ }
+ } else {
+ q->tail = NULL;
+ ret = 1; /* indicating remove from queue failed */
+ }
+ pthread_mutex_unlock(&q->mtx);
+ return (ret);
+}
+
+int is_busy_wrk_itm(wrk_t *wi)
+{
+ if(!wi) {
+ return -1;
+ }
+ return(!(wi->id_usr == -1));
+}
+
+int setup_wrk_itm(int items)
+{
+ int i;
+ for(i=0; i<items; i++) {
+ work_items[i].buf_pool = NULL;
+ work_items[i].result = 0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
+ work_items[i].next = &work_items[(i+1)%items];
+ }
+ /* last node should be the tail */
+ work_items[items-1].next = NULL;
+ return 0;
+}
+
+int init_queue(opq_t *q)
+{
+ if(!q) {
+ return -1;
+ }
+ /* Initialize Queue mutex and CV */
+ pthread_mutex_init(&q->mtx, NULL);
+ pthread_cond_init(&q->cv, NULL);
+ q->flag = Q_INITIALIZED;
+ q->head = q->tail = NULL;
+
+ return 0;
+}
+
+#if 0
+int drain_cq(opq_t *cq, int items)
+{
+ int i=0;
+
+ if(!cq) {
+ return -1;
+ }
+ pthread_mutex_lock(&cq->mtx);
+ for(i=0; i<items; i++) {
+ work_items[i].result=0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ }
+ cq->head = cq->tail = NULL;
+ pthread_mutex_unlock(&cq->mtx);
+ return 0;
+}
+#endif
+
+int q_insert_wrk_list(opq_t *q, wrk_t *w_list)
+{
+ if((!q) || (!w_list)) {
+ fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list);
+ return -1;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* list is empty */
+ if(!q->tail) {
+ q->head = q->tail = w_list;
+ } else {
+ /* added the first of the node to list */
+ assert(q->head != NULL);
+ q->tail->next = w_list;
+ }
+
+ /* move tail to the last node */
+ while(q->tail->next) {
+ q->tail = q->tail->next;
+ }
+ pthread_mutex_unlock(&q->mtx);
+
+ return 0;
+}
+
+int flush_pool_instance(wrk_t *wi)
+{
+ struct timeval p_start_time, p_end_time, d_time;
+
+ if(!wi) {
+ fprintf(stderr, "work item invalid wi:%p\n", wi);
+ return -1;
+ }
+
+ wi->t_usec = 0;
+ if (!buf_flush_start(wi->buf_pool, (buf_flush)wi->flush_type)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+ fprintf(stderr, "flush_start Failed, flush_type:%d\n",
+ (buf_flush)wi->flush_type);
+ return -1;
+ }
+
+#ifdef UNIV_DEBUG
+ /* Record time taken for the OP in usec */
+ gettimeofday(&p_start_time, 0x0);
+#endif
+
+ if((buf_flush)wi->flush_type == BUF_FLUSH_LRU) {
+ /* srv_LRU_scan_depth can be arbitrarily large value.
+ * We cap it with current LRU size.
+ */
+ buf_pool_mutex_enter(wi->buf_pool);
+ wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU);
+ buf_pool_mutex_exit(wi->buf_pool);
+ wi->min = ut_min(srv_LRU_scan_depth,wi->min);
+ }
+
+ wi->result = buf_flush_batch(wi->buf_pool,
+ (buf_flush)wi->flush_type,
+ wi->min, wi->lsn_limit);
+
+ buf_flush_end(wi->buf_pool, (buf_flush)wi->flush_type);
+ buf_flush_common((buf_flush)wi->flush_type, wi->result);
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
+#endif
+
+ return 0;
+}
+
+int service_page_comp_io(thread_sync_t * ppc)
+{
+ wrk_t *wi = NULL;
+ int ret=0;
+
+ pthread_mutex_lock(&ppc->wq->mtx);
+ do{
+ ppc->wt_status = WTHR_SIG_WAITING;
+ ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx);
+ ppc->wt_status = WTHR_RUNNING;
+ if(ret == ETIMEDOUT) {
+ fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n",
+ done_cnt_flag, ret);
+ } else if(ret == EINVAL || ret == EPERM) {
+ fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n",
+ done_cnt_flag, ret);
+ }
+ if(ppc->wq->flag == Q_PROCESS) {
+ break;
+ } else {
+ pthread_mutex_unlock(&ppc->wq->mtx);
+ return -1;
+ }
+ } while (ppc->wq->flag == Q_PROCESS && ret == 0);
+
+ pthread_mutex_unlock(&ppc->wq->mtx);
+
+ while (ppc->cq->flag == Q_PROCESS) {
+ wi = NULL;
+ /* Get the work item */
+ if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) {
+ ppc->wt_status = WTHR_NO_WORK;
+ return -1;
+ }
+
+ assert(ret==0);
+ assert(wi != NULL);
+ assert(0 == is_busy_wrk_itm(wi));
+ assert(wi->id_usr == -1);
+
+ wi->id_usr = ppc->wthread;
+ wi->wi_status = WRK_ITEM_START;
+
+ /* Process work item */
+ if(0 != (ret = flush_pool_instance(wi))) {
+ fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
+ wi->wi_status = WRK_ITEM_FAILED;
+ }
+
+ ret = q_insert_wrk_list(ppc->cq, wi);
+
+ assert(0==ret);
+ assert(check_wrk_done_count >= done_cnt_flag);
+ wi->wi_status = WRK_ITEM_SUCCESS;
+ if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) {
+ break;
+ }
+ }
+ return(0);
+}
+
+/******************************************************************//**
+@return a dummy parameter*/
+extern "C" UNIV_INTERN
+os_thread_ret_t
+DECLARE_THREAD(page_comp_io_thread)(
+/*==========================================*/
+ void * arg)
+{
+ thread_sync_t *ppc_io = ((thread_sync_t *)arg);
+
+ while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
+ service_page_comp_io(ppc_io);
+ ppc_io->stat_cycle_num_processed = 0;
+ }
+ os_thread_exit(NULL);
+ OS_THREAD_DUMMY_RETURN;
+}
+
+int print_queue_wrk_itm(opq_t *q)
+{
+#if UNIV_DEBUG
+ wrk_t *wi = NULL;
+
+ if(!q) {
+ fprintf(stderr, "queue NULL\n");
+ return -1;
+ }
+
+ if(!q->head || !q->tail) {
+ assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL))));
+ fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail);
+ return 0;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+ for(wi = q->head; (wi != NULL) ; wi = wi->next) {
+ //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
+ // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
+ fprintf(stderr, "- [%p] [%s] >%p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->next);
+ }
+ pthread_mutex_unlock(&q->mtx);
+#endif
+ return(0);
+}
+
+int print_wrk_list(wrk_t *wi_list)
+{
+ wrk_t *wi = wi_list;
+ int i=0;
+
+ if(!wi_list) {
+ fprintf(stderr, "list NULL\n");
+ }
+
+ while(wi) {
+ fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next);
+ wi = wi->next;
+ i++;
+ }
+ fprintf(stderr, "list len: %d\n", i);
+ return 0;
+}
+
+int pgcomp_handler(wrk_t *w_list)
+{
+ int ret=0;
+ opq_t *wrk_q=NULL, *comp_q=NULL;
+
+ wrk_q=&wq;
+ comp_q=&cq;
+
+ pthread_mutex_lock(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_EMPTY;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ ret = q_insert_wrk_list(wrk_q, w_list);
+ if(ret != 0) {
+ fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n",
+ __FUNCTION__, &wq, w_list);
+ return -1;
+ }
+
+retry_submit:
+ pthread_mutex_lock(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+
+ pthread_mutex_lock(&comp_q->mtx);
+ if(0 != set_done_cnt_flag(0)) {
+ fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&comp_q->mtx);
+ return -1;
+ }
+ comp_q->flag = Q_PROCESS;
+ pthread_mutex_unlock(&comp_q->mtx);
+
+ /* if threads are waiting request them to start */
+ pthread_mutex_lock(&wrk_q->mtx);
+ wrk_q->flag = Q_PROCESS;
+ pthread_cond_broadcast(&wrk_q->cv);
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ /* Wait on all worker-threads to complete */
+ pthread_mutex_lock(&comp_q->mtx);
+ if (comp_q->flag != Q_DONE) {
+ do {
+ pthread_cond_wait(&comp_q->cv, &comp_q->mtx);
+ if(comp_q->flag != Q_DONE) {
+ fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ }
+ continue;
+ } else if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&comp_q->mtx);
+ goto retry_submit;
+
+ assert(!done_cnt_flag);
+ continue;
+ }
+ assert(done_cnt_flag == srv_buf_pool_instances);
+
+ if ((comp_q->flag == Q_DONE) &&
+ (done_cnt_flag == srv_buf_pool_instances)) {
+ break;
+ }
+ } while((comp_q->flag == Q_INITIALIZED) &&
+ (done_cnt_flag != srv_buf_pool_instances));
+ } else {
+ fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ if (!done_cnt_flag) {
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&comp_q->mtx);
+ goto retry_submit;
+ assert(!done_cnt_flag);
+ }
+ assert(done_cnt_flag == srv_buf_pool_instances);
+ }
+
+ pthread_mutex_unlock(&comp_q->mtx);
+ pthread_mutex_lock(&wrk_q->mtx);
+ wrk_q->flag = Q_DONE;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ return 0;
+}
+
+/******************************************************************//**
+@return a dummy parameter*/
+int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
+{
+ int i=0;
+
+ if(is_pgcomp_wrk_init_done()) {
+ fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n");
+ return -1;
+ }
+
+ if(!wq || !cq) {
+ fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq);
+ return -1;
+ }
+
+ /* work-item setup */
+ setup_wrk_itm(wrk_cnt);
+
+ /* wq & cq setup */
+ init_queue(wq);
+ init_queue(cq);
+
+ /* Mark each of the thread sync entires */
+ for(i=0; i < PGCOMP_MAX_WORKER; i++) {
+ pc_sync[i].wthread_id = i;
+ }
+
+ /* Create threads for page-compression-flush */
+ for(i=0; i < num_threads; i++) {
+ pc_sync[i].wthread_id = i;
+ pc_sync[i].wq = wq;
+ pc_sync[i].cq = cq;
+ os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
+ thread_ids + START_PGCOMP_CNT + i);
+ //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i];
+ pc_sync[i].wthread = (START_PGCOMP_CNT + i);
+ pc_sync[i].wt_status = WTHR_INITIALIZED;
+ }
+
+ set_check_done_flag_count(wrk_cnt);
+ set_pgcomp_wrk_init_done();
+
+ return 0;
+}
+
+
+int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads)
+{
+ long stat_tot=0;
+ unsigned int i=0;
+ for(i=0; i< num_threads;i++) {
+ stat_tot+=wthr[i].stat_universal_num_processed;
+ fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
+ wthr[i].stat_universal_num_processed);
+ }
+ fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
+ return (0);
+}
+
+int reset_wrk_itm(int items)
+{
+ int i;
+
+ pthread_mutex_lock(&wq.mtx);
+ wq.head = wq.tail = NULL;
+ pthread_mutex_unlock(&wq.mtx);
+
+ pthread_mutex_lock(&cq.mtx);
+ for(i=0;i<items; i++) {
+ work_items[i].id_usr = -1;
+ }
+ cq.head = cq.tail = NULL;
+ pthread_mutex_unlock(&cq.mtx);
+ return 0;
+}
+
+int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
+ int flush_type, int min_n, unsigned long long lsn_limit)
+{
+ int ret=0, i=0;
+
+ pthread_mutex_lock(&wq.mtx);
+ pthread_mutex_lock(&cq.mtx);
+
+ assert(wq.head == NULL);
+ assert(wq.tail == NULL);
+ if(cq.head) {
+ print_wrk_list(cq.head);
+ }
+ assert(cq.head == NULL);
+ assert(cq.tail == NULL);
+
+ for(i=0;i<buf_pool_inst; i++) {
+ work_items[i].buf_pool = buf_pool_from_array(i);
+ work_items[i].flush_type = flush_type;
+ work_items[i].min = min_n;
+ work_items[i].lsn_limit = lsn_limit;
+ work_items[i].id_usr = -1;
+ work_items[i].next = &work_items[(i+1)%buf_pool_inst];
+ work_items[i].wi_status = WRK_ITEM_SET;
+ }
+ work_items[i-1].next=NULL;
+
+ pthread_mutex_unlock(&cq.mtx);
+ pthread_mutex_unlock(&wq.mtx);
+
+ pgcomp_handler(work_items);
+
+ pthread_mutex_lock(&wq.mtx);
+ pthread_mutex_lock(&cq.mtx);
+ /* collect data/results total pages flushed */
+ for(i=0; i<buf_pool_inst; i++) {
+ if(work_items[i].result == -1) {
+ ret = -1;
+ per_pool_pages_flushed[i] = 0;
+ } else {
+ per_pool_pages_flushed[i] = work_items[i].result;
+ }
+ if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) {
+ fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type);
+ assert(0);
+ }
+ }
+
+ wq.flag = cq.flag = Q_INITIALIZED;
+
+ pthread_mutex_unlock(&cq.mtx);
+ pthread_mutex_unlock(&wq.mtx);
+
+#if UNIV_DEBUG
+ /* Print work-list stats */
+ fprintf(stderr, "==wq== [DONE]\n");
+ print_wrk_list(wq.head);
+ fprintf(stderr, "==cq== [DONE]\n");
+ print_wrk_list(cq.head);
+ fprintf(stderr, "==worker-thread-stats==\n");
+ wrk_thread_stat(pc_sync, pgc_n_threads);
+#endif
+
+ /* clear up work-queue for next flush */
+ reset_wrk_itm(buf_pool_inst);
+ return(ret);
+}
+
+/* JAN: TODO: END: */
+
/********************************************************************
Starts InnoDB and creates a new database if database files
are not found and the user wants.
@@ -2585,6 +3275,16 @@ files_checked:
}
if (!srv_read_only_mode) {
+ /* JAN: TODO: */
+ if (srv_buf_pool_instances <= PGCOMP_MAX_WORKER) {
+ pgc_n_threads = srv_buf_pool_instances;
+ }
+ /* else we default to 8 worker-threads */
+ pgcomp_handler_init(pgc_n_threads, srv_buf_pool_instances, &wq, &cq);
+ /* JAN: TODO: END */
+#if UNIV_DEBUG
+ fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances);
+#endif
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
}