diff options
Diffstat (limited to 'storage/innobase/btr/btr0defragment.cc')
-rw-r--r-- | storage/innobase/btr/btr0defragment.cc | 156 |
1 files changed, 99 insertions, 57 deletions
diff --git a/storage/innobase/btr/btr0defragment.cc b/storage/innobase/btr/btr0defragment.cc index b22d9f8323d..ebe9854b5dc 100644 --- a/storage/innobase/btr/btr0defragment.cc +++ b/storage/innobase/btr/btr0defragment.cc @@ -50,6 +50,21 @@ possible. From experimentation it seems that reduce the target size by 512 every time will make sure the page is compressible within a couple of iterations. */ #define BTR_DEFRAGMENT_PAGE_REDUCTION_STEP_SIZE 512 +/** Item in the work queue for btr_degrament_thread. */ +struct btr_defragment_item_t +{ + btr_pcur_t* pcur; /* persistent cursor where + btr_defragment_n_pages should start */ + os_event_t event; /* if not null, signal after work + is done */ + bool removed; /* Mark an item as removed */ + ulonglong last_processed; /* timestamp of last time this index + is processed by defragment thread */ + + btr_defragment_item_t(btr_pcur_t* pcur, os_event_t event); + ~btr_defragment_item_t(); +}; + /* Work queue for defragmentation. */ typedef std::list<btr_defragment_item_t*> btr_defragment_wq_t; static btr_defragment_wq_t btr_defragment_wq; @@ -71,6 +86,21 @@ The difference between btr_defragment_count and btr_defragment_failures shows the amount of effort wasted. */ Atomic_counter<ulint> btr_defragment_count; +bool btr_defragment_active; + +struct defragment_chunk_state_t +{ + btr_defragment_item_t* m_item; +}; + +static defragment_chunk_state_t defragment_chunk_state; +static void btr_defragment_chunk(void*); + +static tpool::timer* btr_defragment_timer; +static tpool::task_group task_group(1); +static tpool::task btr_defragment_task(btr_defragment_chunk, 0, &task_group); +static void btr_defragment_start(); + /******************************************************************//** Constructor for btr_defragment_item_t. */ btr_defragment_item_t::btr_defragment_item_t( @@ -94,6 +124,11 @@ btr_defragment_item_t::~btr_defragment_item_t() { } } +static void submit_defragment_task(void*arg=0) +{ + srv_thread_pool->submit_task(&btr_defragment_task); +} + /******************************************************************//** Initialize defragmentation. */ void @@ -101,6 +136,9 @@ btr_defragment_init() { srv_defragment_interval = 1000000000ULL / srv_defragment_frequency; mutex_create(LATCH_ID_BTR_DEFRAGMENT_MUTEX, &btr_defragment_mutex); + defragment_chunk_state.m_item = 0; + btr_defragment_timer = srv_thread_pool->create_timer(submit_defragment_task); + btr_defragment_active = true; } /******************************************************************//** @@ -108,6 +146,11 @@ Shutdown defragmentation. Release all resources. */ void btr_defragment_shutdown() { + if (!btr_defragment_timer) + return; + delete btr_defragment_timer; + btr_defragment_timer = 0; + task_group.cancel_pending(&btr_defragment_task); mutex_enter(&btr_defragment_mutex); std::list< btr_defragment_item_t* >::iterator iter = btr_defragment_wq.begin(); while(iter != btr_defragment_wq.end()) { @@ -117,6 +160,7 @@ btr_defragment_shutdown() } mutex_exit(&btr_defragment_mutex); mutex_free(&btr_defragment_mutex); + btr_defragment_active = false; } @@ -160,11 +204,7 @@ btr_defragment_add_index( *err = DB_SUCCESS; mtr_start(&mtr); - // Load index rood page. - buf_block_t* block = btr_block_get( - page_id_t(index->table->space_id, index->page), - index->table->space->zip_size(), - RW_NO_LATCH, index, &mtr); + buf_block_t* block = btr_root_block_get(index, RW_NO_LATCH, &mtr); page_t* page = NULL; if (block) { @@ -196,6 +236,10 @@ btr_defragment_add_index( btr_defragment_item_t* item = new btr_defragment_item_t(pcur, event); mutex_enter(&btr_defragment_mutex); btr_defragment_wq.push_back(item); + if(btr_defragment_wq.size() == 1){ + /* Kick off defragmentation work */ + btr_defragment_start(); + } mutex_exit(&btr_defragment_mutex); return event; } @@ -367,7 +411,7 @@ btr_defragment_calc_n_recs_for_size( Merge as many records from the from_block to the to_block. Delete the from_block if all records are successfully merged to to_block. @return the to_block to target for next merge operation. */ -UNIV_INTERN +static buf_block_t* btr_defragment_merge_pages( dict_index_t* index, /*!< in: index tree */ @@ -413,7 +457,7 @@ btr_defragment_merge_pages( // reorganizing the page, otherwise we need to reorganize the page // first to release more space. if (move_size > max_ins_size) { - if (!btr_page_reorganize_block(false, page_zip_level, + if (!btr_page_reorganize_block(page_zip_level, to_block, index, mtr)) { if (!dict_index_is_clust(index) @@ -485,9 +529,7 @@ btr_defragment_merge_pages( lock_update_merge_left(to_block, orig_pred, from_block); btr_search_drop_page_hash_index(from_block); - btr_level_list_remove( - index->table->space_id, - zip_size, from_page, index, mtr); + btr_level_list_remove(*from_block, *index, mtr); btr_page_get_father(index, from_block, mtr, &parent); btr_cur_node_ptr_delete(&parent, mtr); /* btr_blob_dbg_remove(from_page, index, @@ -580,7 +622,7 @@ btr_defragment_n_pages( blocks[0] = block; for (uint i = 1; i <= n_pages; i++) { page_t* page = buf_block_get_frame(blocks[i-1]); - ulint page_no = btr_page_get_next(page); + uint32_t page_no = btr_page_get_next(page); total_data_size += page_get_data_size(page); total_n_recs += page_get_n_recs(page); if (page_no == FIL_NULL) { @@ -589,9 +631,8 @@ btr_defragment_n_pages( break; } - blocks[i] = btr_block_get(page_id_t(index->table->space_id, - page_no), zip_size, - RW_X_LATCH, index, mtr); + blocks[i] = btr_block_get(*index, page_no, RW_X_LATCH, true, + mtr); } if (n_pages == 1) { @@ -638,8 +679,9 @@ btr_defragment_n_pages( max_data_size = optimal_page_size; } - reserved_space = ut_min((ulint)(optimal_page_size - * (1 - srv_defragment_fill_factor)), + reserved_space = ut_min(static_cast<ulint>( + static_cast<double>(optimal_page_size) + * (1 - srv_defragment_fill_factor)), (data_size_per_rec * srv_defragment_fill_factor_n_recs)); optimal_page_size -= reserved_space; @@ -679,14 +721,29 @@ btr_defragment_n_pages( return current_block; } -/** Whether btr_defragment_thread is active */ -bool btr_defragment_thread_active; -/** Merge consecutive b-tree pages into fewer pages to defragment indexes */ -extern "C" UNIV_INTERN -os_thread_ret_t -DECLARE_THREAD(btr_defragment_thread)(void*) + +void btr_defragment_start() { + if (!srv_defragment) + return; + ut_ad(!btr_defragment_wq.empty()); + submit_defragment_task(); +} + + +/** +Callback used by defragment timer + +Throttling "sleep", is implemented via rescheduling the +threadpool timer, which, when fired, will resume the work again, +where it is left. + +The state (current item) is stored in function parameter. +*/ +static void btr_defragment_chunk(void*) { + defragment_chunk_state_t* state = &defragment_chunk_state; + btr_pcur_t* pcur; btr_cur_t* cursor; dict_index_t* index; @@ -695,37 +752,24 @@ DECLARE_THREAD(btr_defragment_thread)(void*) buf_block_t* last_block; while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - ut_ad(btr_defragment_thread_active); - - /* If defragmentation is disabled, sleep before - checking whether it's enabled. */ - if (!srv_defragment) { - os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS); - continue; - } - /* The following call won't remove the item from work queue. - We only get a pointer to it to work on. This will make sure - when user issue a kill command, all indices are in the work - queue to be searched. This also means that the user thread - cannot directly remove the item from queue (since we might be - using it). So user thread only marks index as removed. */ - btr_defragment_item_t* item = btr_defragment_get_item(); - /* If work queue is empty, sleep and check later. */ - if (!item) { - os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS); - continue; + if (!state->m_item) { + state->m_item = btr_defragment_get_item(); } /* If an index is marked as removed, we remove it from the work queue. No other thread could be using this item at this point so it's safe to remove now. */ - if (item->removed) { - btr_defragment_remove_item(item); - continue; + while (state->m_item && state->m_item->removed) { + btr_defragment_remove_item(state->m_item); + state->m_item = btr_defragment_get_item(); + } + if (!state->m_item) { + /* Queue empty */ + return; } - pcur = item->pcur; + pcur = state->m_item->pcur; ulonglong now = my_interval_timer(); - ulonglong elapsed = now - item->last_processed; + ulonglong elapsed = now - state->m_item->last_processed; if (elapsed < srv_defragment_interval) { /* If we see an index again before the interval @@ -734,12 +778,13 @@ DECLARE_THREAD(btr_defragment_thread)(void*) defragmentation of all indices queue up on a single thread, it's likely other indices that follow this one don't need to sleep again. */ - os_thread_sleep(static_cast<ulint> - ((srv_defragment_interval - elapsed) - / 1000)); + int sleep_ms = (int)((srv_defragment_interval - elapsed) / 1000 / 1000); + if (sleep_ms) { + btr_defragment_timer->set_time(sleep_ms, 0); + return; + } } - - now = my_interval_timer(); + log_free_check(); mtr_start(&mtr); cursor = btr_pcur_get_btr_cur(pcur); index = btr_cur_get_index(cursor); @@ -768,7 +813,7 @@ DECLARE_THREAD(btr_defragment_thread)(void*) btr_pcur_store_position(pcur, &mtr); mtr_commit(&mtr); /* Update the last_processed time of this index. */ - item->last_processed = now; + state->m_item->last_processed = now; } else { dberr_t err = DB_SUCCESS; mtr_commit(&mtr); @@ -791,11 +836,8 @@ DECLARE_THREAD(btr_defragment_thread)(void*) } } - btr_defragment_remove_item(item); + btr_defragment_remove_item(state->m_item); + state->m_item = NULL; } } - - btr_defragment_thread_active = false; - os_thread_exit(); - OS_THREAD_DUMMY_RETURN; } |