summaryrefslogtreecommitdiff
path: root/storage/innobase/buf/buf0mtflu.cc
blob: e990ba785e7310eeb5d0ece30f77811f92b4bb1f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
/*****************************************************************************

Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2015, MariaDB Corporation. All Rights Reserved.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

*****************************************************************************/

/******************************************************************//**
@file buf/buf0mtflu.cc
Multi-threaded flush method implementation

Created  06/11/2013 Dhananjoy Das DDas@fusionio.com
Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/

#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "page0zip.h"
#include "ut0byte.h"
#include "ut0lst.h"
#include "page0page.h"
#include "fil0fil.h"
#include "buf0lru.h"
#include "buf0rea.h"
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
#include "fil0pagecompress.h"

#define	MT_COMP_WATER_MARK	50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000

/* Work item status */
typedef enum wrk_status {
	WRK_ITEM_UNSET=0,	/*!< Work item is not set */
	WRK_ITEM_START=1,	/*!< Processing of work item has started */
	WRK_ITEM_DONE=2,	/*!< Processing is done usually set to
				SUCCESS/FAILED */
	WRK_ITEM_SUCCESS=2,	/*!< Work item successfully processed */
	WRK_ITEM_FAILED=3,	/*!< Work item process failed */
	WRK_ITEM_EXIT=4,	/*!< Exiting */
	WRK_ITEM_SET=5,		/*!< Work item is set */
	WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t;

/* Work item task type */
typedef enum mt_wrk_tsk {
	MT_WRK_NONE=0,		/*!< Exit queue-wait */
	MT_WRK_WRITE=1,		/*!< Flush operation */
	MT_WRK_READ=2,		/*!< Read operation  */
	MT_WRK_UNDEFINED
} mt_wrk_tsk_t;

/* Work thread status */
typedef enum wthr_status {
	WTHR_NOT_INIT=0,	/*!< Work thread not initialized */
	WTHR_INITIALIZED=1,	/*!< Work thread initialized */
	WTHR_SIG_WAITING=2,	/*!< Work thread wating signal */
	WTHR_RUNNING=3,		/*!< Work thread running */
	WTHR_NO_WORK=4,		/*!< Work thread has no work */
	WTHR_KILL_IT=5,		/*!< Work thread should exit */
	WTHR_STATUS_UNDEFINED
} wthr_status_t;

/* Write work task */
typedef struct wr_tsk {
	buf_pool_t	*buf_pool;	/*!< buffer-pool instance */
	buf_flush_t	flush_type;	/*!< flush-type for buffer-pool
					flush operation */
	ulint		min;		/*!< minimum number of pages
					requested to be flushed */
	lsn_t		lsn_limit;	/*!< lsn limit for the buffer-pool
					flush operation */
} wr_tsk_t;

/* Read work task */
typedef struct rd_tsk {
	buf_pool_t	*page_pool;	/*!< list of pages to decompress; */
} rd_tsk_t;

/* Work item */
typedef struct wrk_itm
{
	mt_wrk_tsk_t	tsk;		/*!< Task type. Based on task-type
					one of the entries wr_tsk/rd_tsk
					will be used */
	wr_tsk_t	wr;		/*!< Flush page list */
	rd_tsk_t	rd;		/*!< Decompress page list */
        ulint		n_flushed; 	/*!< Number of flushed pages */
	ulint		n_evicted;	/*!< Number of evicted pages */
 	os_thread_id_t	id_usr;		/*!< Thread-id currently working */
    	wrk_status_t    wi_status;	/*!< Work item status */
	mem_heap_t      *wheap;         /*!< Heap were to allocate memory
					for queue nodes */
	mem_heap_t      *rheap;
} wrk_t;

typedef struct thread_data
{
	os_thread_id_t	wthread_id;	/*!< Identifier */
	os_thread_t 	wthread;	/*!< Thread id */
	wthr_status_t   wt_status;	/*!< Worker thread status */
} thread_data_t;

/* Thread syncronization data */
typedef struct thread_sync
{
	/* Global variables used by all threads */
	os_fast_mutex_t	thread_global_mtx; /*!< Mutex used protecting below
					   variables */
	ulint           n_threads;	/*!< Number of threads */
	ib_wqueue_t	*wq;		/*!< Work Queue */
	ib_wqueue_t     *wr_cq;		/*!< Write Completion Queue */
	ib_wqueue_t     *rd_cq;		/*!< Read Completion Queue */
	mem_heap_t*     wheap;		/*!< Work heap where memory
					is allocated */
	mem_heap_t*     rheap;		/*!< Work heap where memory
					is allocated */
	wthr_status_t   gwt_status;     /*!< Global thread status */

	/* Variables used by only one thread at a time */
        thread_data_t*  thread_data;    /*!< Thread specific data */

} thread_sync_t;

static int		mtflush_work_initialized = -1;
static thread_sync_t*   mtflush_ctx=NULL;
static os_fast_mutex_t  mtflush_mtx;

/******************************************************************//**
Set multi-threaded flush work initialized. */
static inline
void
buf_mtflu_work_init(void)
/*=====================*/
{
	mtflush_work_initialized = 1;
}

/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized */
bool
buf_mtflu_init_done(void)
/*=====================*/
{
	return(mtflush_work_initialized == 1);
}

/******************************************************************//**
Fush buffer pool instance.
@return number of flushed pages, or 0 if error happened
*/
static
ulint
buf_mtflu_flush_pool_instance(
/*==========================*/
	wrk_t	*work_item)	/*!< inout: work item to be flushed */
{
	flush_counters_t	n;
	ut_a(work_item != NULL);
	ut_a(work_item->wr.buf_pool != NULL);

	if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
		/* We have two choices here. If lsn_limit was
		specified then skipping an instance of buffer
		pool means we cannot guarantee that all pages
		up to lsn_limit has been flushed. We can
		return right now with failure or we can try
		to flush remaining buffer pools up to the
		lsn_limit. We attempt to flush other buffer
		pools based on the assumption that it will
		help in the retry which will follow the
		failure. */
#ifdef UNIV_MTFLUSH_DEBUG
		fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n");
#endif
		return 0;
	}

	memset(&n, 0, sizeof(flush_counters_t));

    	if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
        	/* srv_LRU_scan_depth can be arbitrarily large value.
        	 * We cap it with current LRU size.
        	 */
        	buf_pool_mutex_enter(work_item->wr.buf_pool);
        	work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU);
        	buf_pool_mutex_exit(work_item->wr.buf_pool);
        	work_item->wr.min = ut_min(srv_LRU_scan_depth,work_item->wr.min);
    	}

	buf_flush_batch(work_item->wr.buf_pool,
		work_item->wr.flush_type,
		work_item->wr.min,
		work_item->wr.lsn_limit,
		&n);

	buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
	buf_flush_common(work_item->wr.flush_type, n.flushed);
	work_item->n_flushed = n.flushed;
	work_item->n_evicted = n.evicted;

	return work_item->n_flushed;
}

/******************************************************************//**
Worker function to wait for work items and processing them and
sending reply back.
*/
static
void
mtflush_service_io(
/*===============*/
	thread_sync_t*	mtflush_io,	/*!< inout: multi-threaded flush
					syncronization data */
	thread_data_t*  thread_data)    /* Thread status data */
{
	wrk_t		*work_item = NULL;
	ulint		n_flushed=0;

	ut_a(mtflush_io != NULL);
	ut_a(thread_data != NULL);

   	thread_data->wt_status = WTHR_SIG_WAITING;

	work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);

	if (work_item == NULL) {
		work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
	}

	if (work_item) {
		thread_data->wt_status = WTHR_RUNNING;
	} else {
		/* Thread did not get any work */
		thread_data->wt_status = WTHR_NO_WORK;
		return;
	}

	if (work_item->wi_status != WRK_ITEM_EXIT) {
		work_item->wi_status = WRK_ITEM_SET;
	}

#ifdef UNIV_MTFLUSH_DEBUG
	ut_a(work_item->id_usr == 0);
#endif
	work_item->id_usr = os_thread_get_curr_id();

	/*  This works as a producer/consumer model, where in tasks are
         *  inserted into the work-queue (wq) and completions are based
         *  on the type of operations performed and as a result the WRITE/
         *  compression/flush operation completions get posted to wr_cq.
         *  And READ/decompress operations completions get posted to rd_cq.
         *  in future we may have others.
	*/

	switch(work_item->tsk) {
	case MT_WRK_NONE:
		ut_a(work_item->wi_status == WRK_ITEM_EXIT);
		work_item->wi_status = WRK_ITEM_EXIT;
		ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
		thread_data->wt_status = WTHR_KILL_IT;
		break;

	case MT_WRK_WRITE:
		ut_a(work_item->wi_status == WRK_ITEM_SET);
		work_item->wi_status = WRK_ITEM_START;
		/* Process work item */
		if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
			work_item->wi_status = WRK_ITEM_FAILED;
		}
		work_item->wi_status = WRK_ITEM_SUCCESS;
		ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
		break;

	case MT_WRK_READ:
		ut_a(0);
		break;

	default:
		/* None other than Write/Read handling planned */
		ut_a(0);
		break;
	}
}

/******************************************************************//**
Thead used to flush dirty pages when multi-threaded flush is
used.
@return a dummy parameter*/
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(mtflush_io_thread)(
/*==============================*/
	void * arg)
{
	thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
	thread_data_t *this_thread_data = NULL;
	ulint i;

	/* Find correct slot for this thread */
	os_fast_mutex_lock(&(mtflush_io->thread_global_mtx));
	for(i=0; i < mtflush_io->n_threads; i ++) {
		if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) {
			break;
		}
	}

	ut_a(i <= mtflush_io->n_threads);
	this_thread_data = &mtflush_io->thread_data[i];
	os_fast_mutex_unlock(&(mtflush_io->thread_global_mtx));

	while (TRUE) {

#ifdef UNIV_MTFLUSH_DEBUG
 		fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
 					os_thread_get_curr_id(),
 					ib_wqueue_len(mtflush_io->wq),
 					ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_MTFLUSH_DEBUG */

		mtflush_service_io(mtflush_io, this_thread_data);


		if (this_thread_data->wt_status == WTHR_KILL_IT) {
			break;
		}
	}

	os_thread_exit(NULL);
	OS_THREAD_DUMMY_RETURN;
}

/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void)
/*==========================*/
{
	ulint i;
	thread_sync_t* mtflush_io = mtflush_ctx;
	wrk_t* work_item = NULL;

	ut_a(mtflush_io != NULL);

	/* Allocate work items for shutdown message */
	work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);

	/* Confirm if the io-thread KILL is in progress, bailout */
	if (mtflush_io->gwt_status == WTHR_KILL_IT) {
		return;
	}

	mtflush_io->gwt_status = WTHR_KILL_IT;

	/* This lock is to safequard against timing bug: flush request take
	this mutex before sending work items to be processed by flush
	threads. Inside flush thread we assume that work queue contains only
	a constant number of items. Thus, we may not install new work items
	below before all previous ones are processed. This mutex is released
	by flush request after all work items sent to flush threads have
	been processed. Thus, we can get this mutex if and only if work
	queue is empty. */

	os_fast_mutex_lock(&mtflush_mtx);

	/* Make sure the work queue is empty */
	ut_a(ib_wqueue_is_empty(mtflush_io->wq));

	/* Send one exit work item/thread */
	for (i=0; i < (ulint)srv_mtflush_threads; i++) {
		work_item[i].tsk = MT_WRK_NONE;
		work_item[i].wi_status = WRK_ITEM_EXIT;
		work_item[i].wheap = mtflush_io->wheap;
		work_item[i].rheap = mtflush_io->rheap;
		work_item[i].id_usr = 0;

		ib_wqueue_add(mtflush_io->wq,
			(void *)&(work_item[i]),
			mtflush_io->wheap);
	}

	/* Requests sent */
	os_fast_mutex_unlock(&mtflush_mtx);

	/* Wait until all work items on a work queue are processed */
	while(!ib_wqueue_is_empty(mtflush_io->wq)) {
		/* Wait */
		os_thread_sleep(MT_WAIT_IN_USECS);
	}

	ut_a(ib_wqueue_is_empty(mtflush_io->wq));

	/* Collect all work done items */
	for (i=0; i < (ulint)srv_mtflush_threads;) {
		wrk_t* work_item = NULL;

		work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);

		/* If we receive reply to work item and it's status is exit,
		thead has processed this message and existed */
		if (work_item && work_item->wi_status == WRK_ITEM_EXIT) {
			i++;
		}
	}

	/* Wait about 1/2 sec to allow threads really exit */
	os_thread_sleep(MT_WAIT_IN_USECS);

	/* Make sure that work queue is empty */
	while(!ib_wqueue_is_empty(mtflush_io->wq))
	{
		ib_wqueue_nowait(mtflush_io->wq);
	}

	os_fast_mutex_lock(&mtflush_mtx);

	ut_a(ib_wqueue_is_empty(mtflush_io->wq));
	ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
	ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq));

	/* Free all queues */
	ib_wqueue_free(mtflush_io->wq);
	ib_wqueue_free(mtflush_io->wr_cq);
	ib_wqueue_free(mtflush_io->rd_cq);

	mtflush_io->wq = NULL;
	mtflush_io->wr_cq  = NULL;
	mtflush_io->rd_cq = NULL;
	mtflush_work_initialized = 0;

	/* Free heap */
	mem_heap_free(mtflush_io->wheap);
	mem_heap_free(mtflush_io->rheap);

	os_fast_mutex_unlock(&mtflush_mtx);
	os_fast_mutex_free(&mtflush_mtx);
	os_fast_mutex_free(&mtflush_io->thread_global_mtx);
}

/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
	ulint n_threads,	/*!< in: Number of threads to create */
	ulint wrk_cnt)		/*!< in: Number of work items */
{
	ulint   	i;
	mem_heap_t*	mtflush_heap;
	mem_heap_t*	mtflush_heap2;

	/* Create heap, work queue, write completion queue, read
	completion queue for multi-threaded flush, and init
	handler. */
	mtflush_heap = mem_heap_create(0);
	ut_a(mtflush_heap != NULL);
	mtflush_heap2 = mem_heap_create(0);
	ut_a(mtflush_heap2 != NULL);

	mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
				sizeof(thread_sync_t));
	memset(mtflush_ctx, 0, sizeof(thread_sync_t));
	ut_a(mtflush_ctx != NULL);
	mtflush_ctx->thread_data = (thread_data_t*)mem_heap_alloc(
		mtflush_heap, sizeof(thread_data_t) * n_threads);
	ut_a(mtflush_ctx->thread_data);
	memset(mtflush_ctx->thread_data, 0, sizeof(thread_data_t) * n_threads);

	mtflush_ctx->n_threads = n_threads;
	mtflush_ctx->wq = ib_wqueue_create();
	ut_a(mtflush_ctx->wq);
	mtflush_ctx->wr_cq = ib_wqueue_create();
	ut_a(mtflush_ctx->wr_cq);
	mtflush_ctx->rd_cq = ib_wqueue_create();
	ut_a(mtflush_ctx->rd_cq);
	mtflush_ctx->wheap = mtflush_heap;
	mtflush_ctx->rheap = mtflush_heap2;

	os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_ctx->thread_global_mtx);
	os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);

	/* Create threads for page-compression-flush */
	for(i=0; i < n_threads; i++) {
		os_thread_id_t new_thread_id;

		mtflush_ctx->thread_data[i].wt_status = WTHR_INITIALIZED;

		mtflush_ctx->thread_data[i].wthread = os_thread_create(
			mtflush_io_thread,
			((void *) mtflush_ctx),
	                &new_thread_id);

		mtflush_ctx->thread_data[i].wthread_id = new_thread_id;
	}

	buf_mtflu_work_init();

	return((void *)mtflush_ctx);
}

/******************************************************************//**
Flush buffer pool instances.
@return number of pages flushed. */
ulint
buf_mtflu_flush_work_items(
/*=======================*/
	ulint buf_pool_inst,		/*!< in: Number of buffer pool instances */
	flush_counters_t *per_pool_cnt,	/*!< out: Number of pages
					flushed or evicted /instance */
	buf_flush_t flush_type,		/*!< in: Type of flush */
	ulint min_n,			/*!< in: Wished minimum number of
					blocks to be flushed */
	lsn_t lsn_limit)		/*!< in: All blocks whose
					oldest_modification is smaller than
					this should be flushed (if their
					number does not exceed min_n) */
{
	ulint n_flushed=0, i;
	mem_heap_t* work_heap;
	mem_heap_t* reply_heap;
	wrk_t work_item[MTFLUSH_MAX_WORKER];

	if (mtflush_ctx->gwt_status == WTHR_KILL_IT) {
		return 0;
	}

	/* Allocate heap where all work items used and queue
	node items areallocated */
	work_heap = mem_heap_create(0);
	reply_heap = mem_heap_create(0);


	for(i=0;i<buf_pool_inst; i++) {
		work_item[i].tsk = MT_WRK_WRITE;
		work_item[i].wr.buf_pool = buf_pool_from_array(i);
		work_item[i].wr.flush_type = flush_type;
		work_item[i].wr.min = min_n;
		work_item[i].wr.lsn_limit = lsn_limit;
		work_item[i].wi_status = WRK_ITEM_UNSET;
		work_item[i].wheap = work_heap;
		work_item[i].rheap = reply_heap;
		work_item[i].n_flushed = 0;
		work_item[i].n_evicted = 0;
		work_item[i].id_usr = 0;

		ib_wqueue_add(mtflush_ctx->wq,
			(void *)(work_item + i),
			work_heap);
	}

	/* wait on the completion to arrive */
   	for(i=0; i< buf_pool_inst;) {
		wrk_t *done_wi = NULL;
		done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);

		if (done_wi != NULL) {
			per_pool_cnt[i].flushed = done_wi->n_flushed;
			per_pool_cnt[i].evicted = done_wi->n_evicted;

#ifdef UNIV_MTFLUSH_DEBUG
			if((int)done_wi->id_usr == 0 &&
				(done_wi->wi_status == WRK_ITEM_SET ||
					done_wi->wi_status == WRK_ITEM_UNSET)) {
				fprintf(stderr,
					"**Set/Unused work_item[%lu] flush_type=%d\n",
					i,
					done_wi->wr.flush_type);
				ut_a(0);
			}
#endif

			n_flushed+= done_wi->n_flushed+done_wi->n_evicted;
			i++;
		}
	}

	/* Release used work_items and queue nodes */
	mem_heap_free(work_heap);
	mem_heap_free(reply_heap);

	return(n_flushed);
}

/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list(
/*=================*/
	ulint		min_n,		/*!< in: wished minimum mumber of blocks
					flushed (it is not guaranteed that the
					actual number is that big, though) */
	lsn_t		lsn_limit,	/*!< in the case BUF_FLUSH_LIST all
					blocks whose oldest_modification is
					smaller than this should be flushed
					(if their number does not exceed
					min_n), otherwise ignored */
	ulint*		n_processed)	/*!< out: the number of pages
					which were processed is passed
					back to caller. Ignored if NULL */

{
	ulint				i;
	bool				success = true;
	flush_counters_t		cnt[MTFLUSH_MAX_WORKER];

	if (n_processed) {
		*n_processed = 0;
	}

	if (min_n != ULINT_MAX) {
		/* Ensure that flushing is spread evenly amongst the
		buffer pool instances. When min_n is ULINT_MAX
		we need to flush everything up to the lsn limit
		so no limit here. */
		min_n = (min_n + srv_buf_pool_instances - 1)
			 / srv_buf_pool_instances;
	}

	/* This lock is to safequard against re-entry if any. */
	os_fast_mutex_lock(&mtflush_mtx);
	buf_mtflu_flush_work_items(srv_buf_pool_instances,
                cnt, BUF_FLUSH_LIST,
                min_n, lsn_limit);
	os_fast_mutex_unlock(&mtflush_mtx);

	for (i = 0; i < srv_buf_pool_instances; i++) {
		if (n_processed) {
			*n_processed += cnt[i].flushed+cnt[i].evicted;
		}

		if (cnt[i].flushed) {
			MONITOR_INC_VALUE_CUMULATIVE(
				MONITOR_FLUSH_BATCH_TOTAL_PAGE,
				MONITOR_FLUSH_BATCH_COUNT,
				MONITOR_FLUSH_BATCH_PAGES,
				cnt[i].flushed);
		}

		if(cnt[i].evicted) {
				MONITOR_INC_VALUE_CUMULATIVE(
				MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE,
				MONITOR_LRU_BATCH_EVICT_COUNT,
				MONITOR_LRU_BATCH_EVICT_PAGES,
				cnt[i].evicted);
		}
	}
#ifdef UNIV_MTFLUSH_DEBUG
	fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n",
		__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed);
#endif
	return(success);
}

/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail(void)
/*==========================*/
{
	ulint	total_flushed=0, i;
	flush_counters_t	cnt[MTFLUSH_MAX_WORKER];

	ut_a(buf_mtflu_init_done());

	/* At shutdown do not send requests anymore */
	if (!mtflush_ctx || mtflush_ctx->gwt_status == WTHR_KILL_IT) {
		return (total_flushed);
	}

	/* This lock is to safeguard against re-entry if any */
	os_fast_mutex_lock(&mtflush_mtx);
	buf_mtflu_flush_work_items(srv_buf_pool_instances,
		cnt, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
	os_fast_mutex_unlock(&mtflush_mtx);

	for (i = 0; i < srv_buf_pool_instances; i++) {
		total_flushed += cnt[i].flushed+cnt[i].evicted;

		if (cnt[i].flushed) {
			MONITOR_INC_VALUE_CUMULATIVE(
			        MONITOR_LRU_BATCH_FLUSH_TOTAL_PAGE,
			        MONITOR_LRU_BATCH_FLUSH_COUNT,
			        MONITOR_LRU_BATCH_FLUSH_PAGES,
			        cnt[i].flushed);
		}

		if(cnt[i].evicted) {
				MONITOR_INC_VALUE_CUMULATIVE(
				MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE,
				MONITOR_LRU_BATCH_EVICT_COUNT,
				MONITOR_LRU_BATCH_EVICT_PAGES,
				cnt[i].evicted);
		}
	}

#if UNIV_MTFLUSH_DEBUG
	fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", (
			srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed);
#endif

	return(total_flushed);
}

/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
	ulint		n_threads,	/*!<in: Number of threads to fill */
        void*		ctx,		/*!<in: thread context */
	os_thread_id_t*	thread_ids)	/*!<in: thread id array */
{
	thread_sync_t *mtflush_io = ((thread_sync_t *)ctx);
	ulint i;
	ut_a(mtflush_io != NULL);
	ut_a(thread_ids != NULL);

	for(i = 0; i < n_threads; i++) {
		thread_ids[i] = mtflush_io->thread_data[i].wthread_id;
	}
}