diff options
author | Jan Lindström <jplindst@mariadb.org> | 2013-12-19 14:36:38 +0200 |
---|---|---|
committer | Jan Lindström <jplindst@mariadb.org> | 2013-12-19 14:36:38 +0200 |
commit | 5e55d1ced52c52fb2f0508e1346059901a85960f (patch) | |
tree | 517032a404d9b0ebde9b9174459fbce301dcec95 /storage/innobase/srv/srv0start.cc | |
parent | 1f4f425a2007c51eeee35f911a787fc7d82d977c (diff) | |
download | mariadb-git-5e55d1ced52c52fb2f0508e1346059901a85960f.tar.gz |
Changes for Fusion-io multi-threaded flush, page compressed tables and
tables using atomic write/table.
This is work in progress and some parts are at most POC quality.
Diffstat (limited to 'storage/innobase/srv/srv0start.cc')
-rw-r--r-- | storage/innobase/srv/srv0start.cc | 720 |
1 files changed, 710 insertions, 10 deletions
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index efe9f094c0d..0517f4b1468 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -3,6 +3,7 @@ Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2008, Google Inc. Copyright (c) 2009, Percona Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -126,7 +127,10 @@ static os_file_t files[1000]; /** io_handler_thread parameters for thread identification */ static ulint n[SRV_MAX_N_IO_THREADS + 6]; /** io_handler_thread identifiers, 32 is the maximum number of purge threads */ -static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32]; +/** pgcomp_thread are 16 total */ +#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + 32) +#define PGCOMP_MAX_WORKER 16 +static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + PGCOMP_MAX_WORKER]; /** We use this mutex to test the return value of pthread_mutex_trylock on successful locking. HP-UX does NOT return 0, though Linux et al do. */ @@ -522,7 +526,7 @@ create_log_file( *file = os_file_create( innodb_file_log_key, name, - OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret); + OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret, FALSE); ib_logf(IB_LOG_LEVEL_INFO, "Setting log file %s size to %lu MB", @@ -715,7 +719,7 @@ open_log_file( *file = os_file_create(innodb_file_log_key, name, OS_FILE_OPEN, OS_FILE_AIO, - OS_LOG_FILE, &ret); + OS_LOG_FILE, &ret, FALSE); if (!ret) { ib_logf(IB_LOG_LEVEL_ERROR, "Unable to open '%s'", name); return(DB_ERROR); @@ -806,7 +810,7 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_CREATE, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (srv_read_only_mode) { @@ -849,7 +853,7 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RAW, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (!ret) { ib_logf(IB_LOG_LEVEL_ERROR, @@ -881,17 +885,17 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RAW, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); } else if (i == 0) { files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RETRY, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); } else { files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN, OS_FILE_NORMAL, - OS_DATA_FILE, &ret); + OS_DATA_FILE, &ret, FALSE); } if (!ret) { @@ -1078,7 +1082,7 @@ srv_undo_tablespace_create( innodb_file_data_key, name, srv_read_only_mode ? OS_FILE_OPEN : OS_FILE_CREATE, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (srv_read_only_mode && ret) { ib_logf(IB_LOG_LEVEL_INFO, @@ -1159,7 +1163,8 @@ srv_undo_tablespace_open( | OS_FILE_ON_ERROR_SILENT, OS_FILE_NORMAL, OS_DATA_FILE, - &ret); + &ret, + FALSE); /* If the file open was successful then load the tablespace. */ @@ -1430,6 +1435,691 @@ srv_start_wait_for_purge_to_start() } } +/* JAN: TODO: */ +/**********************************************************************************/ +extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time); +extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type); +extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type); +extern void buf_flush_common(enum buf_flush flush_type, ulint page_count); +extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit); + +typedef enum wrk_status { + WRK_ITEM_SET=0, + WRK_ITEM_START=1, + WRK_ITEM_DONE=2, + WRK_ITEM_SUCCESS=2, + WRK_ITEM_FAILED=3, + WRK_ITEM_STATUS_UNDEFINED +} wrk_status_t; + +typedef enum wthr_status { + WTHR_NOT_INIT=0, + WTHR_INITIALIZED=1, + WTHR_SIG_WAITING=2, + WTHR_RUNNING=3, + WTHR_NO_WORK=4, + WTHR_KILL_IT=5, + WTHR_STATUS_UNDEFINED +} wthr_status_t; + +typedef struct wrk_itm +{ + /****************************/ + /* Need to group into struct*/ + buf_pool_t* buf_pool; //buffer-pool instance + int flush_type; //flush-type for buffer-pool flush operation + int min; //minimum number of pages requested to be flushed + unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation + /****************************/ + + unsigned long result; //flush pages count + unsigned long t_usec; //time-taken in usec + long id_usr; //thread-id currently working + wrk_status_t wi_status; //flag + struct wrk_itm *next; +} wrk_t; + +typedef enum op_q_status { + Q_NOT_INIT=0, + Q_EMPTY=1, + Q_INITIALIZED=2, + Q_PROCESS=3, + Q_DONE=4, + Q_ERROR=5, + Q_STATUS_UNDEFINED +} q_status_t; + +typedef struct op_queue +{ + pthread_mutex_t mtx; + pthread_cond_t cv; + q_status_t flag; + wrk_t *head; + wrk_t *tail; +} opq_t; + +opq_t wq, cq; + +typedef struct thread_sync +{ + int wthread_id; + pthread_t wthread; + opq_t *wq; + opq_t *cq; + wthr_status_t wt_status; + unsigned long stat_universal_num_processed; + unsigned long stat_cycle_num_processed; +} thread_sync_t; + +/* Global XXX:DD needs to be cleaned */ +int exit_flag; +ulint check_wrk_done_count; +static ulint done_cnt_flag; +static int pgc_n_threads = 8; + +thread_sync_t pc_sync[PGCOMP_MAX_WORKER]; +static wrk_t work_items[PGCOMP_MAX_WORKER]; +static int pgcomp_wrk_initialized = -1; + +int set_check_done_flag_count(int cnt) +{ + return(check_wrk_done_count = cnt); +} + +int set_pgcomp_wrk_init_done(void) +{ + pgcomp_wrk_initialized = 1; + return 0; +} + +int is_pgcomp_wrk_init_done(void) +{ + return(pgcomp_wrk_initialized == 1); +} + +ulint set_done_cnt_flag(ulint val) +{ + /* + * Assumption: The thread calling into set_done_cnt_flag + * needs to have "cq.mtx" acquired, else not safe. + */ + done_cnt_flag = val; + return done_cnt_flag; +} + + +ulint cv_done_inc_flag_sig(thread_sync_t * ppc) +{ + pthread_mutex_lock(&ppc->cq->mtx); + ppc->stat_universal_num_processed++; + ppc->stat_cycle_num_processed++; + done_cnt_flag++; + if(!(done_cnt_flag <= check_wrk_done_count)) { + fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n", + done_cnt_flag, check_wrk_done_count); + } + assert(done_cnt_flag <= check_wrk_done_count); + pthread_mutex_unlock(&ppc->cq->mtx); + if(done_cnt_flag == check_wrk_done_count) { + ppc->wq->flag = Q_DONE; + pthread_mutex_lock(&ppc->cq->mtx); + ppc->cq->flag = Q_DONE; + pthread_cond_signal(&ppc->cq->cv); + pthread_mutex_unlock(&ppc->cq->mtx); + } + return(done_cnt_flag); +} + +int q_remove_wrk(opq_t *q, wrk_t **wi) +{ + int ret = 0; + + if(!wi || !q) { + return -1; + } + + pthread_mutex_lock(&q->mtx); + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* get the first in the list*/ + *wi = q->head; + if(q->head) { + ret = 0; + q->head = q->head->next; + (*wi)->next = NULL; + if(!q->head) { + q->tail = NULL; + } + } else { + q->tail = NULL; + ret = 1; /* indicating remove from queue failed */ + } + pthread_mutex_unlock(&q->mtx); + return (ret); +} + +int is_busy_wrk_itm(wrk_t *wi) +{ + if(!wi) { + return -1; + } + return(!(wi->id_usr == -1)); +} + +int setup_wrk_itm(int items) +{ + int i; + for(i=0; i<items; i++) { + work_items[i].buf_pool = NULL; + work_items[i].result = 0; + work_items[i].t_usec = 0; + work_items[i].id_usr = -1; + work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED; + work_items[i].next = &work_items[(i+1)%items]; + } + /* last node should be the tail */ + work_items[items-1].next = NULL; + return 0; +} + +int init_queue(opq_t *q) +{ + if(!q) { + return -1; + } + /* Initialize Queue mutex and CV */ + pthread_mutex_init(&q->mtx, NULL); + pthread_cond_init(&q->cv, NULL); + q->flag = Q_INITIALIZED; + q->head = q->tail = NULL; + + return 0; +} + +#if 0 +int drain_cq(opq_t *cq, int items) +{ + int i=0; + + if(!cq) { + return -1; + } + pthread_mutex_lock(&cq->mtx); + for(i=0; i<items; i++) { + work_items[i].result=0; + work_items[i].t_usec = 0; + work_items[i].id_usr = -1; + } + cq->head = cq->tail = NULL; + pthread_mutex_unlock(&cq->mtx); + return 0; +} +#endif + +int q_insert_wrk_list(opq_t *q, wrk_t *w_list) +{ + if((!q) || (!w_list)) { + fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list); + return -1; + } + + pthread_mutex_lock(&q->mtx); + + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* list is empty */ + if(!q->tail) { + q->head = q->tail = w_list; + } else { + /* added the first of the node to list */ + assert(q->head != NULL); + q->tail->next = w_list; + } + + /* move tail to the last node */ + while(q->tail->next) { + q->tail = q->tail->next; + } + pthread_mutex_unlock(&q->mtx); + + return 0; +} + +int flush_pool_instance(wrk_t *wi) +{ + struct timeval p_start_time, p_end_time, d_time; + + if(!wi) { + fprintf(stderr, "work item invalid wi:%p\n", wi); + return -1; + } + + wi->t_usec = 0; + if (!buf_flush_start(wi->buf_pool, (buf_flush)wi->flush_type)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ + fprintf(stderr, "flush_start Failed, flush_type:%d\n", + (buf_flush)wi->flush_type); + return -1; + } + +#ifdef UNIV_DEBUG + /* Record time taken for the OP in usec */ + gettimeofday(&p_start_time, 0x0); +#endif + + if((buf_flush)wi->flush_type == BUF_FLUSH_LRU) { + /* srv_LRU_scan_depth can be arbitrarily large value. + * We cap it with current LRU size. + */ + buf_pool_mutex_enter(wi->buf_pool); + wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU); + buf_pool_mutex_exit(wi->buf_pool); + wi->min = ut_min(srv_LRU_scan_depth,wi->min); + } + + wi->result = buf_flush_batch(wi->buf_pool, + (buf_flush)wi->flush_type, + wi->min, wi->lsn_limit); + + buf_flush_end(wi->buf_pool, (buf_flush)wi->flush_type); + buf_flush_common((buf_flush)wi->flush_type, wi->result); + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); +#endif + + return 0; +} + +int service_page_comp_io(thread_sync_t * ppc) +{ + wrk_t *wi = NULL; + int ret=0; + + pthread_mutex_lock(&ppc->wq->mtx); + do{ + ppc->wt_status = WTHR_SIG_WAITING; + ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx); + ppc->wt_status = WTHR_RUNNING; + if(ret == ETIMEDOUT) { + fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n", + done_cnt_flag, ret); + } else if(ret == EINVAL || ret == EPERM) { + fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n", + done_cnt_flag, ret); + } + if(ppc->wq->flag == Q_PROCESS) { + break; + } else { + pthread_mutex_unlock(&ppc->wq->mtx); + return -1; + } + } while (ppc->wq->flag == Q_PROCESS && ret == 0); + + pthread_mutex_unlock(&ppc->wq->mtx); + + while (ppc->cq->flag == Q_PROCESS) { + wi = NULL; + /* Get the work item */ + if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) { + ppc->wt_status = WTHR_NO_WORK; + return -1; + } + + assert(ret==0); + assert(wi != NULL); + assert(0 == is_busy_wrk_itm(wi)); + assert(wi->id_usr == -1); + + wi->id_usr = ppc->wthread; + wi->wi_status = WRK_ITEM_START; + + /* Process work item */ + if(0 != (ret = flush_pool_instance(wi))) { + fprintf(stderr, "FLUSH op failed ret:%d\n", ret); + wi->wi_status = WRK_ITEM_FAILED; + } + + ret = q_insert_wrk_list(ppc->cq, wi); + + assert(0==ret); + assert(check_wrk_done_count >= done_cnt_flag); + wi->wi_status = WRK_ITEM_SUCCESS; + if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) { + break; + } + } + return(0); +} + +/******************************************************************//** +@return a dummy parameter*/ +extern "C" UNIV_INTERN +os_thread_ret_t +DECLARE_THREAD(page_comp_io_thread)( +/*==========================================*/ + void * arg) +{ + thread_sync_t *ppc_io = ((thread_sync_t *)arg); + + while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { + service_page_comp_io(ppc_io); + ppc_io->stat_cycle_num_processed = 0; + } + os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; +} + +int print_queue_wrk_itm(opq_t *q) +{ +#if UNIV_DEBUG + wrk_t *wi = NULL; + + if(!q) { + fprintf(stderr, "queue NULL\n"); + return -1; + } + + if(!q->head || !q->tail) { + assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL)))); + fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail); + return 0; + } + + pthread_mutex_lock(&q->mtx); + for(wi = q->head; (wi != NULL) ; wi = wi->next) { + //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n", + // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next); + fprintf(stderr, "- [%p] [%s] >%p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->next); + } + pthread_mutex_unlock(&q->mtx); +#endif + return(0); +} + +int print_wrk_list(wrk_t *wi_list) +{ + wrk_t *wi = wi_list; + int i=0; + + if(!wi_list) { + fprintf(stderr, "list NULL\n"); + } + + while(wi) { + fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next); + wi = wi->next; + i++; + } + fprintf(stderr, "list len: %d\n", i); + return 0; +} + +int pgcomp_handler(wrk_t *w_list) +{ + int ret=0; + opq_t *wrk_q=NULL, *comp_q=NULL; + + wrk_q=&wq; + comp_q=&cq; + + pthread_mutex_lock(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_EMPTY; + pthread_mutex_unlock(&wrk_q->mtx); + + ret = q_insert_wrk_list(wrk_q, w_list); + if(ret != 0) { + fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n", + __FUNCTION__, &wq, w_list); + return -1; + } + +retry_submit: + pthread_mutex_lock(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&wrk_q->mtx); + + + pthread_mutex_lock(&comp_q->mtx); + if(0 != set_done_cnt_flag(0)) { + fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&comp_q->mtx); + return -1; + } + comp_q->flag = Q_PROCESS; + pthread_mutex_unlock(&comp_q->mtx); + + /* if threads are waiting request them to start */ + pthread_mutex_lock(&wrk_q->mtx); + wrk_q->flag = Q_PROCESS; + pthread_cond_broadcast(&wrk_q->cv); + pthread_mutex_unlock(&wrk_q->mtx); + + /* Wait on all worker-threads to complete */ + pthread_mutex_lock(&comp_q->mtx); + if (comp_q->flag != Q_DONE) { + do { + pthread_cond_wait(&comp_q->cv, &comp_q->mtx); + if(comp_q->flag != Q_DONE) { + fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + } + continue; + } else if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&comp_q->mtx); + goto retry_submit; + + assert(!done_cnt_flag); + continue; + } + assert(done_cnt_flag == srv_buf_pool_instances); + + if ((comp_q->flag == Q_DONE) && + (done_cnt_flag == srv_buf_pool_instances)) { + break; + } + } while((comp_q->flag == Q_INITIALIZED) && + (done_cnt_flag != srv_buf_pool_instances)); + } else { + fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + if (!done_cnt_flag) { + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&comp_q->mtx); + goto retry_submit; + assert(!done_cnt_flag); + } + assert(done_cnt_flag == srv_buf_pool_instances); + } + + pthread_mutex_unlock(&comp_q->mtx); + pthread_mutex_lock(&wrk_q->mtx); + wrk_q->flag = Q_DONE; + pthread_mutex_unlock(&wrk_q->mtx); + + return 0; +} + +/******************************************************************//** +@return a dummy parameter*/ +int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) +{ + int i=0; + + if(is_pgcomp_wrk_init_done()) { + fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n"); + return -1; + } + + if(!wq || !cq) { + fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq); + return -1; + } + + /* work-item setup */ + setup_wrk_itm(wrk_cnt); + + /* wq & cq setup */ + init_queue(wq); + init_queue(cq); + + /* Mark each of the thread sync entires */ + for(i=0; i < PGCOMP_MAX_WORKER; i++) { + pc_sync[i].wthread_id = i; + } + + /* Create threads for page-compression-flush */ + for(i=0; i < num_threads; i++) { + pc_sync[i].wthread_id = i; + pc_sync[i].wq = wq; + pc_sync[i].cq = cq; + os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), + thread_ids + START_PGCOMP_CNT + i); + //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i]; + pc_sync[i].wthread = (START_PGCOMP_CNT + i); + pc_sync[i].wt_status = WTHR_INITIALIZED; + } + + set_check_done_flag_count(wrk_cnt); + set_pgcomp_wrk_init_done(); + + return 0; +} + + +int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads) +{ + long stat_tot=0; + unsigned int i=0; + for(i=0; i< num_threads;i++) { + stat_tot+=wthr[i].stat_universal_num_processed; + fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id, + wthr[i].stat_universal_num_processed); + } + fprintf(stderr, "Stat-Total:%lu\n", stat_tot); + return (0); +} + +int reset_wrk_itm(int items) +{ + int i; + + pthread_mutex_lock(&wq.mtx); + wq.head = wq.tail = NULL; + pthread_mutex_unlock(&wq.mtx); + + pthread_mutex_lock(&cq.mtx); + for(i=0;i<items; i++) { + work_items[i].id_usr = -1; + } + cq.head = cq.tail = NULL; + pthread_mutex_unlock(&cq.mtx); + return 0; +} + +int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed, + int flush_type, int min_n, unsigned long long lsn_limit) +{ + int ret=0, i=0; + + pthread_mutex_lock(&wq.mtx); + pthread_mutex_lock(&cq.mtx); + + assert(wq.head == NULL); + assert(wq.tail == NULL); + if(cq.head) { + print_wrk_list(cq.head); + } + assert(cq.head == NULL); + assert(cq.tail == NULL); + + for(i=0;i<buf_pool_inst; i++) { + work_items[i].buf_pool = buf_pool_from_array(i); + work_items[i].flush_type = flush_type; + work_items[i].min = min_n; + work_items[i].lsn_limit = lsn_limit; + work_items[i].id_usr = -1; + work_items[i].next = &work_items[(i+1)%buf_pool_inst]; + work_items[i].wi_status = WRK_ITEM_SET; + } + work_items[i-1].next=NULL; + + pthread_mutex_unlock(&cq.mtx); + pthread_mutex_unlock(&wq.mtx); + + pgcomp_handler(work_items); + + pthread_mutex_lock(&wq.mtx); + pthread_mutex_lock(&cq.mtx); + /* collect data/results total pages flushed */ + for(i=0; i<buf_pool_inst; i++) { + if(work_items[i].result == -1) { + ret = -1; + per_pool_pages_flushed[i] = 0; + } else { + per_pool_pages_flushed[i] = work_items[i].result; + } + if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) { + fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type); + assert(0); + } + } + + wq.flag = cq.flag = Q_INITIALIZED; + + pthread_mutex_unlock(&cq.mtx); + pthread_mutex_unlock(&wq.mtx); + +#if UNIV_DEBUG + /* Print work-list stats */ + fprintf(stderr, "==wq== [DONE]\n"); + print_wrk_list(wq.head); + fprintf(stderr, "==cq== [DONE]\n"); + print_wrk_list(cq.head); + fprintf(stderr, "==worker-thread-stats==\n"); + wrk_thread_stat(pc_sync, pgc_n_threads); +#endif + + /* clear up work-queue for next flush */ + reset_wrk_itm(buf_pool_inst); + return(ret); +} + +/* JAN: TODO: END: */ + /******************************************************************** Starts InnoDB and creates a new database if database files are not found and the user wants. @@ -2585,6 +3275,16 @@ files_checked: } if (!srv_read_only_mode) { + /* JAN: TODO: */ + if (srv_buf_pool_instances <= PGCOMP_MAX_WORKER) { + pgc_n_threads = srv_buf_pool_instances; + } + /* else we default to 8 worker-threads */ + pgcomp_handler_init(pgc_n_threads, srv_buf_pool_instances, &wq, &cq); + /* JAN: TODO: END */ +#if UNIV_DEBUG + fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances); +#endif os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL); } |