summaryrefslogtreecommitdiff
path: root/storage/innobase/btr/btr0pcur.cc
blob: 25a39e9646bb004cb5048f8b23ee0ba7e19398af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
/*****************************************************************************

Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2016, 2019, MariaDB Corporation.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA

*****************************************************************************/

/**************************************************//**
@file btr/btr0pcur.cc
The index tree persistent cursor

Created 2/23/1996 Heikki Tuuri
*******************************************************/

#include "btr0pcur.h"
#include "ut0byte.h"
#include "rem0cmp.h"
#include "trx0trx.h"

/**************************************************************//**
Allocates memory for a persistent cursor object and initializes the cursor.
@return own: persistent cursor */
btr_pcur_t*
btr_pcur_create_for_mysql(void)
/*============================*/
{
	btr_pcur_t*	pcur;
	DBUG_ENTER("btr_pcur_create_for_mysql");

	pcur = (btr_pcur_t*) ut_malloc_nokey(sizeof(btr_pcur_t));

	pcur->btr_cur.index = NULL;
	btr_pcur_init(pcur);

	DBUG_PRINT("btr_pcur_create_for_mysql", ("pcur: %p", pcur));
	DBUG_RETURN(pcur);
}

/**************************************************************//**
Resets a persistent cursor object, freeing ::old_rec_buf if it is
allocated and resetting the other members to their initial values. */
void
btr_pcur_reset(
/*===========*/
	btr_pcur_t*	cursor)	/*!< in, out: persistent cursor */
{
	btr_pcur_free(cursor);
	cursor->old_rec_buf = NULL;
	cursor->btr_cur.index = NULL;
	cursor->btr_cur.page_cur.rec = NULL;
	cursor->old_rec = NULL;
	cursor->old_n_fields = 0;
	cursor->old_stored = false;

	cursor->latch_mode = BTR_NO_LATCHES;
	cursor->pos_state = BTR_PCUR_NOT_POSITIONED;
}

/**************************************************************//**
Frees the memory for a persistent cursor object. */
void
btr_pcur_free_for_mysql(
/*====================*/
	btr_pcur_t*	cursor)	/*!< in, own: persistent cursor */
{
	DBUG_ENTER("btr_pcur_free_for_mysql");
	DBUG_PRINT("btr_pcur_free_for_mysql", ("pcur: %p", cursor));

	btr_pcur_free(cursor);
	ut_free(cursor);
	DBUG_VOID_RETURN;
}

/**************************************************************//**
The position of the cursor is stored by taking an initial segment of the
record the cursor is positioned on, before, or after, and copying it to the
cursor data structure, or just setting a flag if the cursor id before the
first in an EMPTY tree, or after the last in an EMPTY tree. NOTE that the
page where the cursor is positioned must not be empty if the index tree is
not totally empty! */
void
btr_pcur_store_position(
/*====================*/
	btr_pcur_t*	cursor, /*!< in: persistent cursor */
	mtr_t*		mtr)	/*!< in: mtr */
{
	page_cur_t*	page_cursor;
	buf_block_t*	block;
	rec_t*		rec;
	dict_index_t*	index;
	ulint		offs;

	ut_ad(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
	ut_ad(cursor->latch_mode != BTR_NO_LATCHES);

	block = btr_pcur_get_block(cursor);
	index = btr_cur_get_index(btr_pcur_get_btr_cur(cursor));

	page_cursor = btr_pcur_get_page_cur(cursor);

	rec = page_cur_get_rec(page_cursor);
	offs = rec - block->frame;
	ut_ad(block->page.id.page_no() == page_get_page_no(block->frame));
	ut_ad(block->page.buf_fix_count);
	/* For spatial index, when we do positioning on parent
	buffer if necessary, it might not hold latches, but the
	tree must be locked to prevent change on the page */
	ut_ad(mtr_memo_contains_flagged(mtr, block,
					MTR_MEMO_PAGE_S_FIX
					| MTR_MEMO_PAGE_X_FIX)
	      || (dict_index_is_spatial(index)
		  && mtr_memo_contains_flagged(
			  mtr, dict_index_get_lock(index),
			  MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK)));

	cursor->old_stored = true;

	if (page_is_empty(block->frame)) {
		/* It must be an empty index tree; NOTE that in this case
		we do not store the modify_clock, but always do a search
		if we restore the cursor position */

		ut_a(!page_has_siblings(block->frame));
		ut_ad(page_is_leaf(block->frame));
		ut_ad(block->page.id.page_no() == index->page);

		if (page_rec_is_supremum_low(offs)) {

			cursor->rel_pos = BTR_PCUR_AFTER_LAST_IN_TREE;
		} else {
			cursor->rel_pos = BTR_PCUR_BEFORE_FIRST_IN_TREE;
		}

		return;
	}

	if (page_rec_is_supremum_low(offs)) {

		rec = page_rec_get_prev(rec);

		cursor->rel_pos = BTR_PCUR_AFTER;

	} else if (page_rec_is_infimum_low(offs)) {

		rec = page_rec_get_next(rec);

		cursor->rel_pos = BTR_PCUR_BEFORE;
	} else {
		cursor->rel_pos = BTR_PCUR_ON;
	}

	cursor->old_rec = dict_index_copy_rec_order_prefix(
		index, rec, &cursor->old_n_fields,
		&cursor->old_rec_buf, &cursor->buf_size);

	cursor->block_when_stored = block;

	/* Function try to check if block is S/X latch. */
	cursor->modify_clock = buf_block_get_modify_clock(block);
	cursor->withdraw_clock = buf_withdraw_clock;
}

/**************************************************************//**
Copies the stored position of a pcur to another pcur. */
void
btr_pcur_copy_stored_position(
/*==========================*/
	btr_pcur_t*	pcur_receive,	/*!< in: pcur which will receive the
					position info */
	btr_pcur_t*	pcur_donate)	/*!< in: pcur from which the info is
					copied */
{
	ut_free(pcur_receive->old_rec_buf);
	ut_memcpy(pcur_receive, pcur_donate, sizeof(btr_pcur_t));

	if (pcur_donate->old_rec_buf) {

		pcur_receive->old_rec_buf = (byte*)
			ut_malloc_nokey(pcur_donate->buf_size);

		ut_memcpy(pcur_receive->old_rec_buf, pcur_donate->old_rec_buf,
			  pcur_donate->buf_size);
		pcur_receive->old_rec = pcur_receive->old_rec_buf
			+ (pcur_donate->old_rec - pcur_donate->old_rec_buf);
	}

	pcur_receive->old_n_fields = pcur_donate->old_n_fields;
}

/**************************************************************//**
Restores the stored position of a persistent cursor bufferfixing the page and
obtaining the specified latches. If the cursor position was saved when the
(1) cursor was positioned on a user record: this function restores the position
to the last record LESS OR EQUAL to the stored record;
(2) cursor was positioned on a page infimum record: restores the position to
the last record LESS than the user record which was the successor of the page
infimum;
(3) cursor was positioned on the page supremum: restores to the first record
GREATER than the user record which was the predecessor of the supremum.
(4) cursor was positioned before the first or after the last in an empty tree:
restores to before first or after the last in the tree.
@return TRUE if the cursor position was stored when it was on a user
record and it can be restored on a user record whose ordering fields
are identical to the ones of the original user record */
ibool
btr_pcur_restore_position_func(
/*===========================*/
	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
	btr_pcur_t*	cursor,		/*!< in: detached persistent cursor */
	const char*	file,		/*!< in: file name */
	unsigned	line,		/*!< in: line where called */
	mtr_t*		mtr)		/*!< in: mtr */
{
	dict_index_t*	index;
	dtuple_t*	tuple;
	page_cur_mode_t	mode;
	page_cur_mode_t	old_mode;
	mem_heap_t*	heap;

	ut_ad(mtr->is_active());
	//ut_ad(cursor->old_stored);
	ut_ad(cursor->pos_state == BTR_PCUR_WAS_POSITIONED
	      || cursor->pos_state == BTR_PCUR_IS_POSITIONED);

	index = btr_cur_get_index(btr_pcur_get_btr_cur(cursor));

	if (UNIV_UNLIKELY
	    (cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
	     || cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE)) {
		dberr_t err = DB_SUCCESS;

		/* In these cases we do not try an optimistic restoration,
		but always do a search */

		err = btr_cur_open_at_index_side(
			cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE,
			index, latch_mode,
			btr_pcur_get_btr_cur(cursor), 0, mtr);

		if (err != DB_SUCCESS) {
			ib::warn() << " Error code: " << err
				   << " btr_pcur_restore_position_func "
				   << " called from file: "
				   << file << " line: " << line
				   << " table: " << index->table->name
				   << " index: " << index->name;
		}

		cursor->latch_mode =
			BTR_LATCH_MODE_WITHOUT_INTENTION(latch_mode);
		cursor->pos_state = BTR_PCUR_IS_POSITIONED;
		cursor->block_when_stored = btr_pcur_get_block(cursor);

		return(FALSE);
	}

	ut_a(cursor->old_rec);
	ut_a(cursor->old_n_fields);

	switch (latch_mode) {
	case BTR_SEARCH_LEAF:
	case BTR_MODIFY_LEAF:
	case BTR_SEARCH_PREV:
	case BTR_MODIFY_PREV:
		/* Try optimistic restoration. */

		if (!buf_pool_is_obsolete(cursor->withdraw_clock)
		    && btr_cur_optimistic_latch_leaves(
			cursor->block_when_stored, cursor->modify_clock,
			&latch_mode, btr_pcur_get_btr_cur(cursor),
			file, line, mtr)) {

			cursor->pos_state = BTR_PCUR_IS_POSITIONED;
			cursor->latch_mode = latch_mode;

			buf_block_dbg_add_level(
				btr_pcur_get_block(cursor),
				dict_index_is_ibuf(index)
				? SYNC_IBUF_TREE_NODE : SYNC_TREE_NODE);

			if (cursor->rel_pos == BTR_PCUR_ON) {
#ifdef UNIV_DEBUG
				const rec_t*	rec;
				offset_t	offsets1_[REC_OFFS_NORMAL_SIZE];
				offset_t	offsets2_[REC_OFFS_NORMAL_SIZE];
				offset_t*	offsets1 = offsets1_;
				offset_t*	offsets2 = offsets2_;
				rec = btr_pcur_get_rec(cursor);

				rec_offs_init(offsets1_);
				rec_offs_init(offsets2_);

				heap = mem_heap_create(256);
				offsets1 = rec_get_offsets(
					cursor->old_rec, index, offsets1, true,
					cursor->old_n_fields, &heap);
				offsets2 = rec_get_offsets(
					rec, index, offsets2, true,
					cursor->old_n_fields, &heap);

				ut_ad(!cmp_rec_rec(cursor->old_rec,
						   rec, offsets1, offsets2,
						   index));
				mem_heap_free(heap);
#endif /* UNIV_DEBUG */
				return(TRUE);
			}
			/* This is the same record as stored,
			may need to be adjusted for BTR_PCUR_BEFORE/AFTER,
			depending on search mode and direction. */
			if (btr_pcur_is_on_user_rec(cursor)) {
				cursor->pos_state
					= BTR_PCUR_IS_POSITIONED_OPTIMISTIC;
			}
			return(FALSE);
		}
	}

	/* If optimistic restoration did not succeed, open the cursor anew */

	heap = mem_heap_create(256);

	tuple = dict_index_build_data_tuple(cursor->old_rec, index, true,
					    cursor->old_n_fields, heap);

	/* Save the old search mode of the cursor */
	old_mode = cursor->search_mode;

	switch (cursor->rel_pos) {
	case BTR_PCUR_ON:
		mode = PAGE_CUR_LE;
		break;
	case BTR_PCUR_AFTER:
		mode = PAGE_CUR_G;
		break;
	case BTR_PCUR_BEFORE:
		mode = PAGE_CUR_L;
		break;
	default:
		ut_error;
		mode = PAGE_CUR_UNSUPP;
	}

	btr_pcur_open_with_no_init_func(index, tuple, mode, latch_mode,
					cursor, 0, file, line, mtr);

	/* Restore the old search mode */
	cursor->search_mode = old_mode;

	ut_ad(cursor->rel_pos == BTR_PCUR_ON
	      || cursor->rel_pos == BTR_PCUR_BEFORE
	      || cursor->rel_pos == BTR_PCUR_AFTER);
	offset_t offsets[REC_OFFS_NORMAL_SIZE];
	rec_offs_init(offsets);
	if (cursor->rel_pos == BTR_PCUR_ON
	    && btr_pcur_is_on_user_rec(cursor)
	    && !cmp_dtuple_rec(tuple, btr_pcur_get_rec(cursor),
			       rec_get_offsets(btr_pcur_get_rec(cursor),
					       index, offsets, true,
					       ULINT_UNDEFINED, &heap))) {

		/* We have to store the NEW value for the modify clock,
		since the cursor can now be on a different page!
		But we can retain the value of old_rec */

		cursor->block_when_stored = btr_pcur_get_block(cursor);
		cursor->modify_clock = buf_block_get_modify_clock(
						cursor->block_when_stored);
		cursor->old_stored = true;
		cursor->withdraw_clock = buf_withdraw_clock;

		mem_heap_free(heap);

		return(TRUE);
	}

	mem_heap_free(heap);

	/* We have to store new position information, modify_clock etc.,
	to the cursor because it can now be on a different page, the record
	under it may have been removed, etc. */

	btr_pcur_store_position(cursor, mtr);

	return(FALSE);
}

/*********************************************************//**
Moves the persistent cursor to the first record on the next page. Releases the
latch on the current page, and bufferunfixes it. Note that there must not be
modifications on the current page, as then the x-latch can be released only in
mtr_commit. */
void
btr_pcur_move_to_next_page(
/*=======================*/
	btr_pcur_t*	cursor,	/*!< in: persistent cursor; must be on the
				last record of the current page */
	mtr_t*		mtr)	/*!< in: mtr */
{
	ulint		next_page_no;
	page_t*		page;
	buf_block_t*	next_block;
	page_t*		next_page;
	ulint		mode;

	ut_ad(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
	ut_ad(cursor->latch_mode != BTR_NO_LATCHES);
	ut_ad(btr_pcur_is_after_last_on_page(cursor));

	cursor->old_stored = false;

	page = btr_pcur_get_page(cursor);

	if (UNIV_UNLIKELY(!page)) {
		return;
	}

	next_page_no = btr_page_get_next(page);

	ut_ad(next_page_no != FIL_NULL);

	mode = cursor->latch_mode;
	switch (mode) {
	case BTR_SEARCH_TREE:
		mode = BTR_SEARCH_LEAF;
		break;
	case BTR_MODIFY_TREE:
		mode = BTR_MODIFY_LEAF;
	}

	buf_block_t*	block = btr_pcur_get_block(cursor);

	next_block = btr_block_get(
		page_id_t(block->page.id.space(), next_page_no),
		block->page.size, mode,
		btr_pcur_get_btr_cur(cursor)->index, mtr);

	if (UNIV_UNLIKELY(!next_block)) {
		return;
	}

	next_page = buf_block_get_frame(next_block);
#ifdef UNIV_BTR_DEBUG
	ut_a(page_is_comp(next_page) == page_is_comp(page));
	ut_a(btr_page_get_prev(next_page)
	     == btr_pcur_get_block(cursor)->page.id.page_no());
#endif /* UNIV_BTR_DEBUG */

	btr_leaf_page_release(btr_pcur_get_block(cursor), mode, mtr);

	page_cur_set_before_first(next_block, btr_pcur_get_page_cur(cursor));

	ut_d(page_check_dir(next_page));
}

/*********************************************************//**
Moves the persistent cursor backward if it is on the first record of the page.
Commits mtr. Note that to prevent a possible deadlock, the operation
first stores the position of the cursor, commits mtr, acquires the necessary
latches and restores the cursor position again before returning. The
alphabetical position of the cursor is guaranteed to be sensible on
return, but it may happen that the cursor is not positioned on the last
record of any page, because the structure of the tree may have changed
during the time when the cursor had no latches. */
static
void
btr_pcur_move_backward_from_page(
/*=============================*/
	btr_pcur_t*	cursor,	/*!< in: persistent cursor, must be on the first
				record of the current page */
	mtr_t*		mtr)	/*!< in: mtr */
{
	ulint		prev_page_no;
	page_t*		page;
	buf_block_t*	prev_block;
	ulint		latch_mode;
	ulint		latch_mode2;

	ut_ad(cursor->latch_mode != BTR_NO_LATCHES);
	ut_ad(btr_pcur_is_before_first_on_page(cursor));
	ut_ad(!btr_pcur_is_before_first_in_tree(cursor, mtr));

	latch_mode = cursor->latch_mode;

	if (latch_mode == BTR_SEARCH_LEAF) {

		latch_mode2 = BTR_SEARCH_PREV;

	} else if (latch_mode == BTR_MODIFY_LEAF) {

		latch_mode2 = BTR_MODIFY_PREV;
	} else {
		latch_mode2 = 0; /* To eliminate compiler warning */
		ut_error;
	}

	btr_pcur_store_position(cursor, mtr);

	mtr_commit(mtr);

	mtr_start(mtr);

	btr_pcur_restore_position(latch_mode2, cursor, mtr);

	page = btr_pcur_get_page(cursor);

	prev_page_no = btr_page_get_prev(page);

	if (prev_page_no == FIL_NULL) {
	} else if (btr_pcur_is_before_first_on_page(cursor)) {

		prev_block = btr_pcur_get_btr_cur(cursor)->left_block;

		btr_leaf_page_release(btr_pcur_get_block(cursor),
				      latch_mode, mtr);

		page_cur_set_after_last(prev_block,
					btr_pcur_get_page_cur(cursor));
	} else {

		/* The repositioned cursor did not end on an infimum
		record on a page. Cursor repositioning acquired a latch
		also on the previous page, but we do not need the latch:
		release it. */

		prev_block = btr_pcur_get_btr_cur(cursor)->left_block;

		btr_leaf_page_release(prev_block, latch_mode, mtr);
	}

	cursor->latch_mode = latch_mode;
	cursor->old_stored = false;
}

/*********************************************************//**
Moves the persistent cursor to the previous record in the tree. If no records
are left, the cursor stays 'before first in tree'.
@return TRUE if the cursor was not before first in tree */
ibool
btr_pcur_move_to_prev(
/*==================*/
	btr_pcur_t*	cursor,	/*!< in: persistent cursor; NOTE that the
				function may release the page latch */
	mtr_t*		mtr)	/*!< in: mtr */
{
	ut_ad(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
	ut_ad(cursor->latch_mode != BTR_NO_LATCHES);

	cursor->old_stored = false;

	if (btr_pcur_is_before_first_on_page(cursor)) {

		if (btr_pcur_is_before_first_in_tree(cursor, mtr)) {

			return(FALSE);
		}

		btr_pcur_move_backward_from_page(cursor, mtr);

		return(TRUE);
	}

	btr_pcur_move_to_prev_on_page(cursor);

	return(TRUE);
}

/**************************************************************//**
If mode is PAGE_CUR_G or PAGE_CUR_GE, opens a persistent cursor on the first
user record satisfying the search condition, in the case PAGE_CUR_L or
PAGE_CUR_LE, on the last user record. If no such user record exists, then
in the first case sets the cursor after last in tree, and in the latter case
before first in tree. The latching mode must be BTR_SEARCH_LEAF or
BTR_MODIFY_LEAF. */
void
btr_pcur_open_on_user_rec_func(
/*===========================*/
	dict_index_t*	index,		/*!< in: index */
	const dtuple_t*	tuple,		/*!< in: tuple on which search done */
	page_cur_mode_t	mode,		/*!< in: PAGE_CUR_L, ... */
	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF or
					BTR_MODIFY_LEAF */
	btr_pcur_t*	cursor,		/*!< in: memory buffer for persistent
					cursor */
	const char*	file,		/*!< in: file name */
	unsigned	line,		/*!< in: line where called */
	mtr_t*		mtr)		/*!< in: mtr */
{
	btr_pcur_open_low(index, 0, tuple, mode, latch_mode, cursor,
			  file, line, 0, mtr);

	if ((mode == PAGE_CUR_GE) || (mode == PAGE_CUR_G)) {

		if (btr_pcur_is_after_last_on_page(cursor)) {

			btr_pcur_move_to_next_user_rec(cursor, mtr);
		}
	} else {
		ut_ad((mode == PAGE_CUR_LE) || (mode == PAGE_CUR_L));

		/* Not implemented yet */

		ut_error;
	}
}