diff options
Diffstat (limited to 'storage/pbxt/src/tabcache_xt.cc')
-rw-r--r-- | storage/pbxt/src/tabcache_xt.cc | 114 |
1 files changed, 70 insertions, 44 deletions
diff --git a/storage/pbxt/src/tabcache_xt.cc b/storage/pbxt/src/tabcache_xt.cc index b9f9ccd37e1..cba89a39cc2 100644 --- a/storage/pbxt/src/tabcache_xt.cc +++ b/storage/pbxt/src/tabcache_xt.cc @@ -26,6 +26,10 @@ #include "xt_config.h" +#ifdef DRIZZLED +#include <bitset> +#endif + #include <signal.h> #include "pthread_xt.h" @@ -63,7 +67,7 @@ xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size) for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) { xt_tab_cache.tcm_segment[i].tcs_cache_in_use = 0; xt_tab_cache.tcm_segment[i].tcs_hash_table = (XTTabCachePagePtr *) xt_calloc(self, xt_tab_cache.tcm_hash_size * sizeof(XTTabCachePagePtr)); - xt_rwmutex_init_with_autoname(self, &xt_tab_cache.tcm_segment[i].tcs_lock); + TAB_CAC_INIT_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock); } xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_lock); @@ -97,7 +101,7 @@ xtPublic void xt_tc_exit(XTThreadPtr self) xt_free(self, xt_tab_cache.tcm_segment[i].tcs_hash_table); xt_tab_cache.tcm_segment[i].tcs_hash_table = NULL; - xt_rwmutex_free(self, &xt_tab_cache.tcm_segment[i].tcs_lock); + TAB_CAC_FREE_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock); } } @@ -213,7 +217,7 @@ xtBool XTTabCache::xt_tc_write(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t page->tcp_dirty = TRUE; ASSERT_NS(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id); *op_seq = tci_table->tab_seq.ts_set_op_seq(page); - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); return OK; } @@ -269,21 +273,36 @@ xtBool XTTabCache::xt_tc_write_cond(XTThreadPtr self, XT_ROW_REC_FILE_PTR file, page->tcp_dirty = TRUE; ASSERT(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id); *op_seq = tci_table->tab_seq.ts_set_op_seq(page); - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); return TRUE; no_change: - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); return FALSE; } xtBool XTTabCache::xt_tc_read(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread) { +#ifdef XT_USE_ROW_REC_MMAP_FILES return tc_read_direct(file, ref_id, size, data, thread); +#else + size_t offset; + XTTabCachePagePtr page; + XTTabCacheSegPtr seg; + + if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread)) + return FAILED; + /* A read must be completely on a page: */ + ASSERT_NS(offset + size <= tci_page_size); + memcpy(data, page->tcp_data + offset, size); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); + return OK; +#endif } xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord4 *value, XTThreadPtr thread) { +#ifdef XT_USE_ROW_REC_MMAP_FILES register u_int page_idx; register XTTabCachePagePtr page; register XTTabCacheSegPtr seg; @@ -300,7 +319,7 @@ xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK]; hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size; - xt_rwmutex_slock(&seg->tcs_lock, thread->t_id); + TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id); page = seg->tcs_hash_table[hash_idx]; while (page) { if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) { @@ -311,53 +330,60 @@ xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord ASSERT_NS(offset + 4 <= this->tci_page_size); buffer = page->tcp_data + offset; *value = XT_GET_DISK_4(buffer); - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); return OK; } page = page->tcp_next; } - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); -#ifdef XT_USE_ROW_REC_MMAP_FILES return xt_pread_fmap_4(file, address, value, &thread->st_statistics.st_rec, thread); #else - xtWord1 data[4]; + size_t offset; + XTTabCachePagePtr page; + XTTabCacheSegPtr seg; + xtWord1 *data; - if (!XT_PREAD_RR_FILE(file, address, 4, 4, data, NULL, &thread->st_statistics.st_rec, thread)) + if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread)) return FAILED; + /* A read must be completely on a page: */ + ASSERT_NS(offset + 4 <= tci_page_size); + data = page->tcp_data + offset; *value = XT_GET_DISK_4(data); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); return OK; #endif } -xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread) +xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtBool load, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread) { XTTabCachePagePtr page; XTTabCacheSegPtr seg; -#ifdef XT_SEQ_SCAN_FROM_MEMORY - if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread)) - return FAILED; - if (!seg) { - *ret_page = NULL; - return OK; + if (load) { + if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread)) + return FAILED; + } + else { + if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread)) + return FAILED; + if (!seg) { + *ret_page = NULL; + return OK; + } } -#else - if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread)) - return FAILED; -#endif page->tcp_lock_count++; - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); *ret_page = page; return OK; } -void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR file __attribute__((unused)), XTTabCachePagePtr page, XTThreadPtr thread) +void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, XTThreadPtr thread) { XTTabCacheSegPtr seg; seg = &xt_tab_cache.tcm_segment[page->tcp_seg]; - xt_rwmutex_xlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id); #ifdef DEBUG XTTabCachePagePtr lpage, ppage; @@ -379,7 +405,7 @@ void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR file __attribute__((unus if (page->tcp_lock_count > 0) page->tcp_lock_count--; - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); } xtBool XTTabCache::xt_tc_read_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 *data, XTThreadPtr thread) @@ -412,7 +438,7 @@ xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK]; hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size; - xt_rwmutex_slock(&seg->tcs_lock, thread->t_id); + TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id); page = seg->tcs_hash_table[hash_idx]; while (page) { if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) { @@ -421,12 +447,12 @@ xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size; ASSERT_NS(offset + size <= this->tci_page_size); memcpy(data, page->tcp_data + offset, size); - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); return OK; } page = page->tcp_next; } - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); if (!XT_PREAD_RR_FILE(file, address, size, 0, data, &red_size, &thread->st_statistics.st_rec, thread)) return FAILED; memset(data + red_size, 0, size - red_size); @@ -450,7 +476,7 @@ xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTT seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK]; hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size; - xt_rwmutex_xlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id); page = seg->tcs_hash_table[hash_idx]; while (page) { if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) { @@ -460,7 +486,7 @@ xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTT } page = page->tcp_next; } - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); *ret_seg = NULL; *ret_page = NULL; return OK; @@ -492,7 +518,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK]; hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size; - xt_rwmutex_slock(&seg->tcs_lock, thread->t_id); + TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id); page = seg->tcs_hash_table[hash_idx]; while (page) { if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) { @@ -528,7 +554,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache } page = page->tcp_next; } - xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); /* Page not found, allocate a new page: */ size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size; @@ -674,7 +700,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache #endif /* Add the page to the cache! */ - xt_rwmutex_xlock(&seg->tcs_lock, thread->t_id); + TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id); page = seg->tcs_hash_table[hash_idx]; while (page) { if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) { @@ -898,11 +924,11 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) } seg = &dcg->tcm_segment[page->tcp_seg]; - xt_rwmutex_xlock(&seg->tcs_lock, self->t_id); + TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id); if (page->tcp_dirty) { if (!was_dirty) { - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); goto retry_2; } @@ -923,7 +949,7 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) XTDatabaseHPtr db = tab->tab_db; rewait: - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); /* Flush the log, in case this is holding up the * writer! @@ -963,7 +989,7 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) db->db_wr_freeer_waiting = FALSE; freer_(); // xt_unlock_mutex(&db->db_wr_lock) - xt_rwmutex_xlock(&seg->tcs_lock, self->t_id); + TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id); if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq)) goto rewait; } @@ -988,11 +1014,11 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) */ if ((page = page->tcp_mr_used)) { page_cnt++; - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); goto retry_2; } } - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); dcg->tcm_free_try_count++; /* Starting to spin, free the threads: */ @@ -1047,7 +1073,7 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) seg->tcs_cache_in_use -= freed_space; xt_free_ns(page); - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); self->st_statistics.st_rec_cache_frees++; dcg->tcm_free_try_count = 0; return freed_space; @@ -1156,7 +1182,7 @@ static void *tabc_fr_run_thread(XTThreadPtr self) return NULL; } -static void tabc_fr_free_thread(XTThreadPtr self, void *data __attribute__((unused))) +static void tabc_fr_free_thread(XTThreadPtr self, void *XT_UNUSED(data)) { if (xt_tab_cache.tcm_freeer_thread) { xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock); @@ -1238,7 +1264,7 @@ xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot) while (rec_id<tab->tab_row_eof_id) { if (!tab->tab_rows.tc_fetch(ot->ot_row_file, rec_id, &seg, &page, &poffset, TRUE, self)) xt_throw(self); - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); rec_id += tab->tab_rows.tci_rows_per_page; } @@ -1246,7 +1272,7 @@ xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot) while (rec_id<tab->tab_rec_eof_id) { if (!tab->tab_recs.tc_fetch(ot->ot_rec_file, rec_id, &seg, &page, &poffset, TRUE, self)) xt_throw(self); - xt_rwmutex_unlock(&seg->tcs_lock, self->t_id); + TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); rec_id += tab->tab_recs.tci_rows_per_page; } } |