summaryrefslogtreecommitdiff
path: root/storage/xtradb/buf/buf0mtflu.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/buf/buf0mtflu.cc')
-rw-r--r--storage/xtradb/buf/buf0mtflu.cc733
1 files changed, 733 insertions, 0 deletions
diff --git a/storage/xtradb/buf/buf0mtflu.cc b/storage/xtradb/buf/buf0mtflu.cc
new file mode 100644
index 00000000000..223edab2e9c
--- /dev/null
+++ b/storage/xtradb/buf/buf0mtflu.cc
@@ -0,0 +1,733 @@
+/*****************************************************************************
+
+Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
+Copyright (C) 2013, 2015, MariaDB Corporation. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file buf/buf0mtflu.cc
+Multi-threaded flush method implementation
+
+Created 06/11/2013 Dhananjoy Das DDas@fusionio.com
+Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
+Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
+Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "buf0mtflu.h"
+#include "buf0checksum.h"
+#include "srv0start.h"
+#include "srv0srv.h"
+#include "page0zip.h"
+#include "ut0byte.h"
+#include "ut0lst.h"
+#include "page0page.h"
+#include "fil0fil.h"
+#include "buf0lru.h"
+#include "buf0rea.h"
+#include "ibuf0ibuf.h"
+#include "log0log.h"
+#include "os0file.h"
+#include "os0sync.h"
+#include "trx0sys.h"
+#include "srv0mon.h"
+#include "mysql/plugin.h"
+#include "mysql/service_thd_wait.h"
+#include "fil0pagecompress.h"
+
+#define MT_COMP_WATER_MARK 50
+/** Time to wait for a message. */
+#define MT_WAIT_IN_USECS 5000000
+
+/* Work item status */
+typedef enum wrk_status {
+ WRK_ITEM_UNSET=0, /*!< Work item is not set */
+ WRK_ITEM_START=1, /*!< Processing of work item has started */
+ WRK_ITEM_DONE=2, /*!< Processing is done usually set to
+ SUCCESS/FAILED */
+ WRK_ITEM_SUCCESS=2, /*!< Work item successfully processed */
+ WRK_ITEM_FAILED=3, /*!< Work item process failed */
+ WRK_ITEM_EXIT=4, /*!< Exiting */
+ WRK_ITEM_SET=5, /*!< Work item is set */
+ WRK_ITEM_STATUS_UNDEFINED
+} wrk_status_t;
+
+/* Work item task type */
+typedef enum mt_wrk_tsk {
+ MT_WRK_NONE=0, /*!< Exit queue-wait */
+ MT_WRK_WRITE=1, /*!< Flush operation */
+ MT_WRK_READ=2, /*!< Read operation */
+ MT_WRK_UNDEFINED
+} mt_wrk_tsk_t;
+
+/* Work thread status */
+typedef enum wthr_status {
+ WTHR_NOT_INIT=0, /*!< Work thread not initialized */
+ WTHR_INITIALIZED=1, /*!< Work thread initialized */
+ WTHR_SIG_WAITING=2, /*!< Work thread wating signal */
+ WTHR_RUNNING=3, /*!< Work thread running */
+ WTHR_NO_WORK=4, /*!< Work thread has no work */
+ WTHR_KILL_IT=5, /*!< Work thread should exit */
+ WTHR_STATUS_UNDEFINED
+} wthr_status_t;
+
+/* Write work task */
+typedef struct wr_tsk {
+ buf_pool_t *buf_pool; /*!< buffer-pool instance */
+ buf_flush_t flush_type; /*!< flush-type for buffer-pool
+ flush operation */
+ ulint min; /*!< minimum number of pages
+ requested to be flushed */
+ lsn_t lsn_limit; /*!< lsn limit for the buffer-pool
+ flush operation */
+} wr_tsk_t;
+
+/* Read work task */
+typedef struct rd_tsk {
+ buf_pool_t *page_pool; /*!< list of pages to decompress; */
+} rd_tsk_t;
+
+/* Work item */
+typedef struct wrk_itm
+{
+ mt_wrk_tsk_t tsk; /*!< Task type. Based on task-type
+ one of the entries wr_tsk/rd_tsk
+ will be used */
+ wr_tsk_t wr; /*!< Flush page list */
+ rd_tsk_t rd; /*!< Decompress page list */
+ ulint n_flushed; /*!< Flushed pages count */
+ os_thread_id_t id_usr; /*!< Thread-id currently working */
+ wrk_status_t wi_status; /*!< Work item status */
+ mem_heap_t *wheap; /*!< Heap were to allocate memory
+ for queue nodes */
+ mem_heap_t *rheap;
+} wrk_t;
+
+typedef struct thread_data
+{
+ os_thread_id_t wthread_id; /*!< Identifier */
+ os_thread_t wthread; /*!< Thread id */
+ wthr_status_t wt_status; /*!< Worker thread status */
+} thread_data_t;
+
+/* Thread syncronization data */
+typedef struct thread_sync
+{
+ /* Global variables used by all threads */
+ os_fast_mutex_t thread_global_mtx; /*!< Mutex used protecting below
+ variables */
+ ulint n_threads; /*!< Number of threads */
+ ib_wqueue_t *wq; /*!< Work Queue */
+ ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
+ ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
+ mem_heap_t* wheap; /*!< Work heap where memory
+ is allocated */
+ mem_heap_t* rheap; /*!< Work heap where memory
+ is allocated */
+ wthr_status_t gwt_status; /*!< Global thread status */
+
+ /* Variables used by only one thread at a time */
+ thread_data_t* thread_data; /*!< Thread specific data */
+
+} thread_sync_t;
+
+static int mtflush_work_initialized = -1;
+static thread_sync_t* mtflush_ctx=NULL;
+static os_fast_mutex_t mtflush_mtx;
+
+/******************************************************************//**
+Set multi-threaded flush work initialized. */
+static inline
+void
+buf_mtflu_work_init(void)
+/*=====================*/
+{
+ mtflush_work_initialized = 1;
+}
+
+/******************************************************************//**
+Return true if multi-threaded flush is initialized
+@return true if initialized */
+bool
+buf_mtflu_init_done(void)
+/*=====================*/
+{
+ return(mtflush_work_initialized == 1);
+}
+
+/******************************************************************//**
+Fush buffer pool instance.
+@return number of flushed pages, or 0 if error happened
+*/
+static
+ulint
+buf_mtflu_flush_pool_instance(
+/*==========================*/
+ wrk_t *work_item) /*!< inout: work item to be flushed */
+{
+ flush_counters_t n;
+ ut_a(work_item != NULL);
+ ut_a(work_item->wr.buf_pool != NULL);
+
+ if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+#ifdef UNIV_MTFLUSH_DEBUG
+ fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n");
+#endif
+ return 0;
+ }
+
+ memset(&n, 0, sizeof(flush_counters_t));
+
+ if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
+ /* srv_LRU_scan_depth can be arbitrarily large value.
+ * We cap it with current LRU size.
+ */
+ buf_pool_mutex_enter(work_item->wr.buf_pool);
+ work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU);
+ buf_pool_mutex_exit(work_item->wr.buf_pool);
+ work_item->wr.min = ut_min(srv_LRU_scan_depth,work_item->wr.min);
+ }
+
+ buf_flush_batch(work_item->wr.buf_pool,
+ work_item->wr.flush_type,
+ work_item->wr.min,
+ work_item->wr.lsn_limit,
+ false,
+ &n);
+
+ work_item->n_flushed = n.flushed;
+ buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
+ buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
+
+ return work_item->n_flushed;
+}
+
+/******************************************************************//**
+Worker function to wait for work items and processing them and
+sending reply back.
+*/
+static
+void
+mtflush_service_io(
+/*===============*/
+ thread_sync_t* mtflush_io, /*!< inout: multi-threaded flush
+ syncronization data */
+ thread_data_t* thread_data) /* Thread status data */
+{
+ wrk_t *work_item = NULL;
+ ulint n_flushed=0;
+
+ ut_a(mtflush_io != NULL);
+ ut_a(thread_data != NULL);
+
+ thread_data->wt_status = WTHR_SIG_WAITING;
+
+ work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
+
+ if (work_item == NULL) {
+ work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
+ }
+
+ if (work_item) {
+ thread_data->wt_status = WTHR_RUNNING;
+ } else {
+ /* Thread did not get any work */
+ thread_data->wt_status = WTHR_NO_WORK;
+ return;
+ }
+
+ if (work_item->wi_status != WRK_ITEM_EXIT) {
+ work_item->wi_status = WRK_ITEM_SET;
+ }
+
+#ifdef UNIV_MTFLUSH_DEBUG
+ ut_a(work_item->id_usr == 0);
+#endif
+ work_item->id_usr = os_thread_get_curr_id();
+
+ /* This works as a producer/consumer model, where in tasks are
+ * inserted into the work-queue (wq) and completions are based
+ * on the type of operations performed and as a result the WRITE/
+ * compression/flush operation completions get posted to wr_cq.
+ * And READ/decompress operations completions get posted to rd_cq.
+ * in future we may have others.
+ */
+
+ switch(work_item->tsk) {
+ case MT_WRK_NONE:
+ ut_a(work_item->wi_status == WRK_ITEM_EXIT);
+ work_item->wi_status = WRK_ITEM_EXIT;
+ ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
+ thread_data->wt_status = WTHR_KILL_IT;
+ break;
+
+ case MT_WRK_WRITE:
+ ut_a(work_item->wi_status == WRK_ITEM_SET);
+ work_item->wi_status = WRK_ITEM_START;
+ /* Process work item */
+ if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
+ work_item->wi_status = WRK_ITEM_FAILED;
+ }
+ work_item->wi_status = WRK_ITEM_SUCCESS;
+ ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
+ break;
+
+ case MT_WRK_READ:
+ ut_a(0);
+ break;
+
+ default:
+ /* None other than Write/Read handling planned */
+ ut_a(0);
+ break;
+ }
+}
+
+/******************************************************************//**
+Thead used to flush dirty pages when multi-threaded flush is
+used.
+@return a dummy parameter*/
+extern "C" UNIV_INTERN
+os_thread_ret_t
+DECLARE_THREAD(mtflush_io_thread)(
+/*==============================*/
+ void * arg)
+{
+ thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
+ thread_data_t *this_thread_data = NULL;
+ ulint i;
+
+ /* Find correct slot for this thread */
+ os_fast_mutex_lock(&(mtflush_io->thread_global_mtx));
+ for(i=0; i < mtflush_io->n_threads; i ++) {
+ if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) {
+ break;
+ }
+ }
+
+ ut_a(i <= mtflush_io->n_threads);
+ this_thread_data = &mtflush_io->thread_data[i];
+ os_fast_mutex_unlock(&(mtflush_io->thread_global_mtx));
+
+ while (TRUE) {
+
+#ifdef UNIV_MTFLUSH_DEBUG
+ fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
+ os_thread_get_curr_id(),
+ ib_wqueue_len(mtflush_io->wq),
+ ib_wqueue_len(mtflush_io->wr_cq));
+#endif /* UNIV_MTFLUSH_DEBUG */
+
+ mtflush_service_io(mtflush_io, this_thread_data);
+
+
+ if (this_thread_data->wt_status == WTHR_KILL_IT) {
+ break;
+ }
+ }
+
+ os_thread_exit(NULL);
+ OS_THREAD_DUMMY_RETURN;
+}
+
+/******************************************************************//**
+Add exit work item to work queue to signal multi-threded flush
+threads that they should exit.
+*/
+void
+buf_mtflu_io_thread_exit(void)
+/*==========================*/
+{
+ ulint i;
+ thread_sync_t* mtflush_io = mtflush_ctx;
+ wrk_t* work_item = NULL;
+
+ ut_a(mtflush_io != NULL);
+
+ /* Allocate work items for shutdown message */
+ work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
+
+ /* Confirm if the io-thread KILL is in progress, bailout */
+ if (mtflush_io->gwt_status == WTHR_KILL_IT) {
+ return;
+ }
+
+ mtflush_io->gwt_status = WTHR_KILL_IT;
+
+ /* This lock is to safequard against timing bug: flush request take
+ this mutex before sending work items to be processed by flush
+ threads. Inside flush thread we assume that work queue contains only
+ a constant number of items. Thus, we may not install new work items
+ below before all previous ones are processed. This mutex is released
+ by flush request after all work items sent to flush threads have
+ been processed. Thus, we can get this mutex if and only if work
+ queue is empty. */
+
+ os_fast_mutex_lock(&mtflush_mtx);
+
+ /* Make sure the work queue is empty */
+ ut_a(ib_wqueue_is_empty(mtflush_io->wq));
+
+ /* Send one exit work item/thread */
+ for (i=0; i < (ulint)srv_mtflush_threads; i++) {
+ work_item[i].tsk = MT_WRK_NONE;
+ work_item[i].wi_status = WRK_ITEM_EXIT;
+ work_item[i].wheap = mtflush_io->wheap;
+ work_item[i].rheap = mtflush_io->rheap;
+ work_item[i].id_usr = 0;
+
+ ib_wqueue_add(mtflush_io->wq,
+ (void *)&(work_item[i]),
+ mtflush_io->wheap);
+ }
+
+ /* Requests sent */
+ os_fast_mutex_unlock(&mtflush_mtx);
+
+ /* Wait until all work items on a work queue are processed */
+ while(!ib_wqueue_is_empty(mtflush_io->wq)) {
+ /* Wait */
+ os_thread_sleep(MT_WAIT_IN_USECS);
+ }
+
+ ut_a(ib_wqueue_is_empty(mtflush_io->wq));
+
+ /* Collect all work done items */
+ for (i=0; i < (ulint)srv_mtflush_threads;) {
+ wrk_t* work_item = NULL;
+
+ work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);
+
+ /* If we receive reply to work item and it's status is exit,
+ thead has processed this message and existed */
+ if (work_item && work_item->wi_status == WRK_ITEM_EXIT) {
+ i++;
+ }
+ }
+
+ /* Wait about 1/2 sec to allow threads really exit */
+ os_thread_sleep(MT_WAIT_IN_USECS);
+
+ /* Make sure that work queue is empty */
+ while(!ib_wqueue_is_empty(mtflush_io->wq))
+ {
+ ib_wqueue_nowait(mtflush_io->wq);
+ }
+
+ os_fast_mutex_lock(&mtflush_mtx);
+
+ ut_a(ib_wqueue_is_empty(mtflush_io->wq));
+ ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
+ ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq));
+
+ /* Free all queues */
+ ib_wqueue_free(mtflush_io->wq);
+ ib_wqueue_free(mtflush_io->wr_cq);
+ ib_wqueue_free(mtflush_io->rd_cq);
+
+ mtflush_io->wq = NULL;
+ mtflush_io->wr_cq = NULL;
+ mtflush_io->rd_cq = NULL;
+ mtflush_work_initialized = 0;
+
+ /* Free heap */
+ mem_heap_free(mtflush_io->wheap);
+ mem_heap_free(mtflush_io->rheap);
+
+ os_fast_mutex_unlock(&mtflush_mtx);
+ os_fast_mutex_free(&mtflush_mtx);
+ os_fast_mutex_free(&mtflush_io->thread_global_mtx);
+}
+
+/******************************************************************//**
+Initialize multi-threaded flush thread syncronization data.
+@return Initialized multi-threaded flush thread syncroniztion data. */
+void*
+buf_mtflu_handler_init(
+/*===================*/
+ ulint n_threads, /*!< in: Number of threads to create */
+ ulint wrk_cnt) /*!< in: Number of work items */
+{
+ ulint i;
+ mem_heap_t* mtflush_heap;
+ mem_heap_t* mtflush_heap2;
+
+ /* Create heap, work queue, write completion queue, read
+ completion queue for multi-threaded flush, and init
+ handler. */
+ mtflush_heap = mem_heap_create(0);
+ ut_a(mtflush_heap != NULL);
+ mtflush_heap2 = mem_heap_create(0);
+ ut_a(mtflush_heap2 != NULL);
+
+ mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
+ sizeof(thread_sync_t));
+ memset(mtflush_ctx, 0, sizeof(thread_sync_t));
+ ut_a(mtflush_ctx != NULL);
+ mtflush_ctx->thread_data = (thread_data_t*)mem_heap_alloc(
+ mtflush_heap, sizeof(thread_data_t) * n_threads);
+ ut_a(mtflush_ctx->thread_data);
+ memset(mtflush_ctx->thread_data, 0, sizeof(thread_data_t) * n_threads);
+
+ mtflush_ctx->n_threads = n_threads;
+ mtflush_ctx->wq = ib_wqueue_create();
+ ut_a(mtflush_ctx->wq);
+ mtflush_ctx->wr_cq = ib_wqueue_create();
+ ut_a(mtflush_ctx->wr_cq);
+ mtflush_ctx->rd_cq = ib_wqueue_create();
+ ut_a(mtflush_ctx->rd_cq);
+ mtflush_ctx->wheap = mtflush_heap;
+ mtflush_ctx->rheap = mtflush_heap2;
+
+ os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_ctx->thread_global_mtx);
+ os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
+
+ /* Create threads for page-compression-flush */
+ for(i=0; i < n_threads; i++) {
+ os_thread_id_t new_thread_id;
+
+ mtflush_ctx->thread_data[i].wt_status = WTHR_INITIALIZED;
+
+ mtflush_ctx->thread_data[i].wthread = os_thread_create(
+ mtflush_io_thread,
+ ((void *) mtflush_ctx),
+ &new_thread_id);
+
+ mtflush_ctx->thread_data[i].wthread_id = new_thread_id;
+ }
+
+ buf_mtflu_work_init();
+
+ return((void *)mtflush_ctx);
+}
+
+/******************************************************************//**
+Flush buffer pool instances.
+@return number of pages flushed. */
+ulint
+buf_mtflu_flush_work_items(
+/*=======================*/
+ ulint buf_pool_inst, /*!< in: Number of buffer pool instances */
+ ulint *per_pool_pages_flushed, /*!< out: Number of pages
+ flushed/instance */
+ buf_flush_t flush_type, /*!< in: Type of flush */
+ ulint min_n, /*!< in: Wished minimum number of
+ blocks to be flushed */
+ lsn_t lsn_limit) /*!< in: All blocks whose
+ oldest_modification is smaller than
+ this should be flushed (if their
+ number does not exceed min_n) */
+{
+ ulint n_flushed=0, i;
+ mem_heap_t* work_heap;
+ mem_heap_t* reply_heap;
+ wrk_t work_item[MTFLUSH_MAX_WORKER];
+
+ if (mtflush_ctx->gwt_status == WTHR_KILL_IT) {
+ return 0;
+ }
+
+ /* Allocate heap where all work items used and queue
+ node items areallocated */
+ work_heap = mem_heap_create(0);
+ reply_heap = mem_heap_create(0);
+
+
+ for(i=0;i<buf_pool_inst; i++) {
+ work_item[i].tsk = MT_WRK_WRITE;
+ work_item[i].wr.buf_pool = buf_pool_from_array(i);
+ work_item[i].wr.flush_type = flush_type;
+ work_item[i].wr.min = min_n;
+ work_item[i].wr.lsn_limit = lsn_limit;
+ work_item[i].wi_status = WRK_ITEM_UNSET;
+ work_item[i].wheap = work_heap;
+ work_item[i].rheap = reply_heap;
+ work_item[i].n_flushed = 0;
+ work_item[i].id_usr = 0;
+
+ ib_wqueue_add(mtflush_ctx->wq,
+ (void *)(work_item + i),
+ work_heap);
+ }
+
+ /* wait on the completion to arrive */
+ for(i=0; i< buf_pool_inst;) {
+ wrk_t *done_wi = NULL;
+ done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);
+
+ if (done_wi != NULL) {
+ per_pool_pages_flushed[i] = done_wi->n_flushed;
+
+#ifdef UNIV_MTFLUSH_DEBUG
+ if((int)done_wi->id_usr == 0 &&
+ (done_wi->wi_status == WRK_ITEM_SET ||
+ done_wi->wi_status == WRK_ITEM_UNSET)) {
+ fprintf(stderr,
+ "**Set/Unused work_item[%lu] flush_type=%d\n",
+ i,
+ done_wi->wr.flush_type);
+ ut_a(0);
+ }
+#endif
+
+ n_flushed+= done_wi->n_flushed;
+ i++;
+ }
+ }
+
+ /* Release used work_items and queue nodes */
+ mem_heap_free(work_heap);
+ mem_heap_free(reply_heap);
+
+ return(n_flushed);
+}
+
+/*******************************************************************//**
+Multi-threaded version of buf_flush_list
+*/
+bool
+buf_mtflu_flush_list(
+/*=================*/
+ ulint min_n, /*!< in: wished minimum mumber of blocks
+ flushed (it is not guaranteed that the
+ actual number is that big, though) */
+ lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
+ blocks whose oldest_modification is
+ smaller than this should be flushed
+ (if their number does not exceed
+ min_n), otherwise ignored */
+ ulint* n_processed) /*!< out: the number of pages
+ which were processed is passed
+ back to caller. Ignored if NULL */
+
+{
+ ulint i;
+ bool success = true;
+ ulint cnt_flush[MTFLUSH_MAX_WORKER];
+
+ if (n_processed) {
+ *n_processed = 0;
+ }
+
+ if (min_n != ULINT_MAX) {
+ /* Ensure that flushing is spread evenly amongst the
+ buffer pool instances. When min_n is ULINT_MAX
+ we need to flush everything up to the lsn limit
+ so no limit here. */
+ min_n = (min_n + srv_buf_pool_instances - 1)
+ / srv_buf_pool_instances;
+ }
+
+ /* This lock is to safequard against re-entry if any. */
+ os_fast_mutex_lock(&mtflush_mtx);
+ buf_mtflu_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LIST,
+ min_n, lsn_limit);
+ os_fast_mutex_unlock(&mtflush_mtx);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (n_processed) {
+ *n_processed += cnt_flush[i];
+ }
+ if (cnt_flush[i]) {
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_FLUSH_BATCH_TOTAL_PAGE,
+ MONITOR_FLUSH_BATCH_COUNT,
+ MONITOR_FLUSH_BATCH_PAGES,
+ cnt_flush[i]);
+ }
+ }
+#ifdef UNIV_MTFLUSH_DEBUG
+ fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n",
+ __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed);
+#endif
+ return(success);
+}
+
+/*********************************************************************//**
+Clears up tail of the LRU lists:
+* Put replaceable pages at the tail of LRU to the free list
+* Flush dirty pages at the tail of LRU to the disk
+The depth to which we scan each buffer pool is controlled by dynamic
+config parameter innodb_LRU_scan_depth.
+@return total pages flushed */
+UNIV_INTERN
+ulint
+buf_mtflu_flush_LRU_tail(void)
+/*==========================*/
+{
+ ulint total_flushed=0, i;
+ ulint cnt_flush[MTFLUSH_MAX_WORKER];
+
+ ut_a(buf_mtflu_init_done());
+
+ /* This lock is to safeguard against re-entry if any */
+ os_fast_mutex_lock(&mtflush_mtx);
+ buf_mtflu_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
+ os_fast_mutex_unlock(&mtflush_mtx);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (cnt_flush[i]) {
+ total_flushed += cnt_flush[i];
+
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_LRU_BATCH_TOTAL_PAGE,
+ MONITOR_LRU_BATCH_COUNT,
+ MONITOR_LRU_BATCH_PAGES,
+ cnt_flush[i]);
+ }
+ }
+
+#if UNIV_MTFLUSH_DEBUG
+ fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", (
+ srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed);
+#endif
+
+ return(total_flushed);
+}
+
+/*********************************************************************//**
+Set correct thread identifiers to io thread array based on
+information we have. */
+void
+buf_mtflu_set_thread_ids(
+/*=====================*/
+ ulint n_threads, /*!<in: Number of threads to fill */
+ void* ctx, /*!<in: thread context */
+ os_thread_id_t* thread_ids) /*!<in: thread id array */
+{
+ thread_sync_t *mtflush_io = ((thread_sync_t *)ctx);
+ ulint i;
+ ut_a(mtflush_io != NULL);
+ ut_a(thread_ids != NULL);
+
+ for(i = 0; i < n_threads; i++) {
+ thread_ids[i] = mtflush_io->thread_data[i].wthread_id;
+ }
+}