summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/lsm/lsm_merge.c
blob: 2276631af1e734e88423d66458d5856c3fe1251c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
/*-
 * Copyright (c) 2014-2016 MongoDB, Inc.
 * Copyright (c) 2008-2014 WiredTiger, Inc.
 *	All rights reserved.
 *
 * See the file LICENSE for redistribution information.
 */

#include "wt_internal.h"

static int __lsm_merge_span(
    WT_SESSION_IMPL *, WT_LSM_TREE *, u_int , u_int *, u_int *, uint64_t *);

/*
 * __wt_lsm_merge_update_tree --
 *	Merge a set of chunks and populate a new one.
 *	Must be called with the LSM lock held.
 */
int
__wt_lsm_merge_update_tree(WT_SESSION_IMPL *session,
    WT_LSM_TREE *lsm_tree, u_int start_chunk, u_int nchunks,
    WT_LSM_CHUNK *chunk)
{
	size_t chunks_after_merge;

	WT_RET(__wt_lsm_tree_retire_chunks(
	    session, lsm_tree, start_chunk, nchunks));

	/* Update the current chunk list. */
	chunks_after_merge = lsm_tree->nchunks - (nchunks + start_chunk);
	memmove(lsm_tree->chunk + start_chunk + 1,
	    lsm_tree->chunk + start_chunk + nchunks,
	    chunks_after_merge * sizeof(*lsm_tree->chunk));
	lsm_tree->nchunks -= nchunks - 1;
	memset(lsm_tree->chunk + lsm_tree->nchunks, 0,
	    (nchunks - 1) * sizeof(*lsm_tree->chunk));
	lsm_tree->chunk[start_chunk] = chunk;

	return (0);
}

/*
 * __lsm_merge_aggressive_clear --
 *	We found a merge to do - clear the aggressive timer.
 */
static void
__lsm_merge_aggressive_clear(WT_LSM_TREE *lsm_tree)
{
	F_CLR(lsm_tree, WT_LSM_TREE_AGGRESSIVE_TIMER);
	lsm_tree->merge_aggressiveness = 0;
}

/*
 * __lsm_merge_aggressive_update --
 *	Update the merge aggressiveness for an LSM tree.
 */
static int
__lsm_merge_aggressive_update(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
{
	struct timespec now;
	uint64_t msec_since_last_merge, msec_to_create_merge;
	uint32_t new_aggressive;

	new_aggressive = 0;

	WT_ASSERT(session, lsm_tree->merge_min != 0);
	/*
	 * If the tree is open read-only or we are compacting, be very
	 * aggressive. Otherwise, we can spend a long time waiting for merges
	 * to start in read-only applications.
	 */
	if (!lsm_tree->modified ||
	    F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) {
		lsm_tree->merge_aggressiveness = 10;
		return (0);
	}

	/*
	 * Only get aggressive if a reasonable number of flushes have been
	 * completed since opening the tree.
	 */
	if (lsm_tree->chunks_flushed <= lsm_tree->merge_min) {
		__lsm_merge_aggressive_clear(lsm_tree);
		return (0);
	}

	/*
	 * Start the timer if it isn't running. Use a flag to define whether
	 * the timer is running - since clearing and checking a special
	 * timer value isn't simple.
	 */
	if (!F_ISSET(lsm_tree, WT_LSM_TREE_AGGRESSIVE_TIMER)) {
		F_SET(lsm_tree, WT_LSM_TREE_AGGRESSIVE_TIMER);
		return (__wt_epoch(session, &lsm_tree->merge_aggressive_ts));
	}

	WT_RET(__wt_epoch(session, &now));
	msec_since_last_merge =
	    WT_TIMEDIFF_MS(now, lsm_tree->merge_aggressive_ts);

	/*
	 * If there is no estimate for how long it's taking to fill chunks
	 * pick 10 seconds.
	 */
	msec_to_create_merge = lsm_tree->merge_min *
	    (lsm_tree->chunk_fill_ms == 0 ? 10000 : lsm_tree->chunk_fill_ms);

	/*
	 * Don't consider getting aggressive until enough time has passed that
	 * we should have created enough chunks to trigger a new merge. We
	 * track average chunk-creation time - hence the "should"; the average
	 * fill time may not reflect the actual state if an application
	 * generates a variable load.
	 */
	if (msec_since_last_merge < msec_to_create_merge)
		return (0);

	/*
	 * Bump how aggressively we look for merges based on how long since
	 * the last merge complete. The aggressive setting only increases
	 * slowly - triggering merges across generations of chunks isn't
	 * an efficient use of resources.
	 */
	while ((msec_since_last_merge /= msec_to_create_merge) > 1)
		++new_aggressive;

	if (new_aggressive > lsm_tree->merge_aggressiveness) {
		__wt_verbose(session, WT_VERB_LSM,
		    "LSM merge %s got aggressive "
		    "(old %" PRIu32 " new %" PRIu32 "), "
		    "merge_min %u, %" PRIu64 " / %" PRIu64,
		    lsm_tree->name, lsm_tree->merge_aggressiveness,
		    new_aggressive, lsm_tree->merge_min,
		    msec_since_last_merge, lsm_tree->chunk_fill_ms);
		lsm_tree->merge_aggressiveness = new_aggressive;
	}
	return (0);
}

/*
 * __lsm_merge_span --
 *	Figure out the best span of chunks to merge. Return an error if
 *	there is no need to do any merges.  Called with the LSM tree
 *	locked.
 */
static int
__lsm_merge_span(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree,
    u_int id, u_int *start, u_int *end, uint64_t *records)
{
	WT_LSM_CHUNK *chunk, *previous, *youngest;
	uint32_t aggressive, max_gap, max_gen, max_level;
	uint64_t record_count, chunk_size;
	u_int end_chunk, i, merge_max, merge_min, nchunks, start_chunk;
	u_int oldest_gen, youngest_gen;

	/* Clear the return parameters */
	*start = *end = 0;
	*records = 0;

	chunk_size = 0;
	chunk = youngest = NULL;

	aggressive = lsm_tree->merge_aggressiveness;
	merge_max = (aggressive > WT_LSM_AGGRESSIVE_THRESHOLD) ?
	    100 : lsm_tree->merge_max;
	merge_min = (aggressive > WT_LSM_AGGRESSIVE_THRESHOLD) ?
	    2 : lsm_tree->merge_min;
	max_gap = (aggressive + 4) / 5;
	max_level = (lsm_tree->merge_throttle > 0) ? 0 : id + aggressive;

	/*
	 * If there aren't any chunks to merge, or some of the chunks aren't
	 * yet written, we're done.  A non-zero error indicates that the worker
	 * should assume there is no work to do: if there are unwritten chunks,
	 * the worker should write them immediately.
	 */
	if (lsm_tree->nchunks < merge_min)
		return (WT_NOTFOUND);

	/*
	 * Only include chunks that already have a Bloom filter or are the
	 * result of a merge and not involved in a merge.
	 */
	for (end_chunk = lsm_tree->nchunks - 1; end_chunk > 0; --end_chunk) {
		chunk = lsm_tree->chunk[end_chunk];
		WT_ASSERT(session, chunk != NULL);
		if (F_ISSET(chunk, WT_LSM_CHUNK_MERGING))
			continue;
		if (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) || chunk->generation > 0)
			break;
		else if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OFF) &&
		    F_ISSET(chunk, WT_LSM_CHUNK_ONDISK))
			break;
	}

	/*
	 * Give up immediately if there aren't enough on disk chunks in the
	 * tree for a merge.
	 */
	if (end_chunk < merge_min - 1)
		return (WT_NOTFOUND);

	/*
	 * Look for the most efficient merge we can do.  We define efficiency
	 * as collapsing as many levels as possible while processing the
	 * smallest number of rows.
	 *
	 * We make a distinction between "major" and "minor" merges.  The
	 * difference is whether the oldest chunk is involved: if it is, we can
	 * discard tombstones, because there can be no older record to marked
	 * deleted.
	 *
	 * Respect the configured limit on the number of chunks to merge: start
	 * with the most recent set of chunks and work backwards until going
	 * further becomes significantly less efficient.
	 */
retry_find:
	oldest_gen = youngest_gen = lsm_tree->chunk[end_chunk]->generation;
	for (record_count = 0,
	    start_chunk = end_chunk + 1; start_chunk > 0;) {
		chunk = lsm_tree->chunk[start_chunk - 1];
		youngest = lsm_tree->chunk[end_chunk];
		nchunks = (end_chunk + 1) - start_chunk;

		/*
		 * If the chunk is already involved in a merge or a Bloom
		 * filter is being built for it, stop.
		 */
		if (F_ISSET(chunk, WT_LSM_CHUNK_MERGING) || chunk->bloom_busy)
			break;

		/*
		 * Look for small merges before trying a big one: some threads
		 * should stay in low levels until we get more aggressive.
		 */
		if (chunk->generation > max_level)
			break;

		/*
		 * If the size of the chunks selected so far exceeds the
		 * configured maximum chunk size, stop.  Keep going if we can
		 * slide the window further into the tree: we don't want to
		 * leave small chunks in the middle.
		 */
		if ((chunk_size += chunk->size) > lsm_tree->chunk_max)
			if (nchunks < merge_min ||
			    (chunk->generation > youngest->generation &&
			    chunk_size - youngest->size > lsm_tree->chunk_max))
				break;

		/* Track chunk generations seen while looking for a merge */
		if (chunk->generation < youngest_gen)
			youngest_gen = chunk->generation;
		else if (chunk->generation > oldest_gen)
			oldest_gen = chunk->generation;

		if (oldest_gen - youngest_gen > max_gap)
			break;

		/*
		 * If we have enough chunks for a merge and the next chunk is
		 * in too high a generation, stop.
		 */
		if (nchunks >= merge_min) {
			previous = lsm_tree->chunk[start_chunk];
			max_gen = youngest->generation + max_gap;
			if (previous->generation <= max_gen &&
			    chunk->generation > max_gen)
				break;
		}

		F_SET(chunk, WT_LSM_CHUNK_MERGING);
		record_count += chunk->count;
		--start_chunk;

		/*
		 * If the merge would be too big, or we have a full window
		 * and we could include an older chunk if the window wasn't
		 * full, remove the youngest chunk.
		 */
		if (chunk_size > lsm_tree->chunk_max ||
		    (nchunks == merge_max && start_chunk > 0 &&
		     chunk->generation ==
		     lsm_tree->chunk[start_chunk - 1]->generation)) {
			WT_ASSERT(session,
			    F_ISSET(youngest, WT_LSM_CHUNK_MERGING));
			F_CLR(youngest, WT_LSM_CHUNK_MERGING);
			record_count -= youngest->count;
			chunk_size -= youngest->size;
			--end_chunk;
		} else if (nchunks == merge_max)
			/* We've found the best full merge we can */
			break;
	}
	nchunks = (end_chunk + 1) - start_chunk;

	/* Be paranoid, check that we setup the merge properly. */
	WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks);
#ifdef	HAVE_DIAGNOSTIC
	for (i = 0; i < nchunks; i++) {
		chunk = lsm_tree->chunk[start_chunk + i];
		WT_ASSERT(session,
		    F_ISSET(chunk, WT_LSM_CHUNK_MERGING));
	}
#endif

	WT_ASSERT(session, nchunks == 0 || (chunk != NULL && youngest != NULL));

	/*
	 * Don't do merges that are too small or across too many generations.
	 */
	if (nchunks < merge_min || oldest_gen - youngest_gen > max_gap) {
		for (i = 0; i < nchunks; i++) {
			chunk = lsm_tree->chunk[start_chunk + i];
			WT_ASSERT(session,
			    F_ISSET(chunk, WT_LSM_CHUNK_MERGING));
			F_CLR(chunk, WT_LSM_CHUNK_MERGING);
		}
		/*
		 * If we didn't find a merge with appropriate gaps, try again
		 * with a smaller range.
		 */
		if (end_chunk > lsm_tree->merge_min &&
		    oldest_gen - youngest_gen > max_gap) {
			--end_chunk;
			goto retry_find;
		}
		/* Consider getting aggressive if no merge was found */
		WT_RET(__lsm_merge_aggressive_update(session, lsm_tree));
		return (WT_NOTFOUND);
	}

	__lsm_merge_aggressive_clear(lsm_tree);
	*records = record_count;
	*start = start_chunk;
	*end = end_chunk;
	return (0);
}

/*
 * __wt_lsm_merge --
 *	Merge a set of chunks of an LSM tree.
 */
int
__wt_lsm_merge(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, u_int id)
{
	WT_BLOOM *bloom;
	WT_CURSOR *dest, *src;
	WT_DECL_RET;
	WT_ITEM key, value;
	WT_LSM_CHUNK *chunk;
	uint32_t generation;
	uint64_t insert_count, record_count;
	u_int dest_id, end_chunk, i, nchunks, start_chunk, start_id, verb;
	int tret;
	bool created_chunk, create_bloom, locked, in_sync;
	const char *cfg[3];
	const char *drop_cfg[] =
	    { WT_CONFIG_BASE(session, WT_SESSION_drop), "force", NULL };

	bloom = NULL;
	chunk = NULL;
	dest = src = NULL;
	created_chunk = create_bloom = locked = in_sync = false;

	/* Fast path if it's obvious no merges could be done. */
	if (lsm_tree->nchunks < lsm_tree->merge_min &&
	    lsm_tree->merge_aggressiveness < WT_LSM_AGGRESSIVE_THRESHOLD)
		return (WT_NOTFOUND);

	/*
	 * Use the lsm_tree lock to read the chunks (so no switches occur), but
	 * avoid holding it while the merge is in progress: that may take a
	 * long time.
	 */
	__wt_lsm_tree_writelock(session, lsm_tree);
	locked = true;

	WT_ERR(__lsm_merge_span(session,
	    lsm_tree, id, &start_chunk, &end_chunk, &record_count));
	nchunks = (end_chunk + 1) - start_chunk;

	WT_ASSERT(session, nchunks > 0);
	start_id = lsm_tree->chunk[start_chunk]->id;

	/* Find the merge generation. */
	for (generation = 0, i = 0; i < nchunks; i++)
		generation = WT_MAX(generation,
		    lsm_tree->chunk[start_chunk + i]->generation + 1);

	__wt_lsm_tree_writeunlock(session, lsm_tree);
	locked = false;

	/* Allocate an ID for the merge. */
	dest_id = __wt_atomic_add32(&lsm_tree->last, 1);

	/*
	 * We only want to do the chunk loop if we're running with verbose,
	 * so we wrap these statements in the conditional.  Avoid the loop
	 * in the normal path.
	 */
	if (WT_VERBOSE_ISSET(session, WT_VERB_LSM)) {
		__wt_verbose(session, WT_VERB_LSM,
		    "Merging %s chunks %u-%u into %u (%" PRIu64 " records)"
		    ", generation %" PRIu32,
		    lsm_tree->name,
		    start_chunk, end_chunk, dest_id, record_count, generation);
		for (verb = start_chunk; verb < end_chunk + 1; verb++)
			__wt_verbose(session, WT_VERB_LSM,
			    "Merging %s: Chunk[%u] id %" PRIu32
			    ", gen: %" PRIu32
			    ", size: %" PRIu64 ", records: %" PRIu64,
			    lsm_tree->name, verb, lsm_tree->chunk[verb]->id,
			    lsm_tree->chunk[verb]->generation,
			    lsm_tree->chunk[verb]->size,
			    lsm_tree->chunk[verb]->count);
	}

	WT_ERR(__wt_calloc_one(session, &chunk));
	created_chunk = true;
	chunk->id = dest_id;

	if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_MERGED) &&
	    (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OLDEST) ||
	    start_chunk > 0) && record_count > 0)
		create_bloom = true;

	/*
	 * Special setup for the merge cursor:
	 * first, reset to open the dependent cursors;
	 * then restrict the cursor to a specific number of chunks;
	 * then set MERGE so the cursor doesn't track updates to the tree.
	 */
	WT_ERR(__wt_open_cursor(session, lsm_tree->name, NULL, NULL, &src));
	F_SET(src, WT_CURSTD_RAW);
	WT_ERR(__wt_clsm_init_merge(src, start_chunk, start_id, nchunks));

	WT_WITH_SCHEMA_LOCK(session, ret,
	    ret = __wt_lsm_tree_setup_chunk(session, lsm_tree, chunk));
	WT_ERR(ret);
	if (create_bloom) {
		WT_ERR(__wt_lsm_tree_setup_bloom(session, lsm_tree, chunk));

		WT_ERR(__wt_bloom_create(session, chunk->bloom_uri,
		    lsm_tree->bloom_config,
		    record_count, lsm_tree->bloom_bit_count,
		    lsm_tree->bloom_hash_count, &bloom));
	}

	/* Discard pages we read as soon as we're done with them. */
	F_SET(session, WT_SESSION_NO_CACHE);

	cfg[0] = WT_CONFIG_BASE(session, WT_SESSION_open_cursor);
	cfg[1] = "bulk,raw,skip_sort_check";
	cfg[2] = NULL;
	WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cfg, &dest));

#define	LSM_MERGE_CHECK_INTERVAL	WT_THOUSAND
	for (insert_count = 0; (ret = src->next(src)) == 0; insert_count++) {
		if (insert_count % LSM_MERGE_CHECK_INTERVAL == 0) {
			if (!lsm_tree->active)
				WT_ERR(EINTR);

			WT_STAT_FAST_CONN_INCRV(session,
			    lsm_rows_merged, LSM_MERGE_CHECK_INTERVAL);
			++lsm_tree->merge_progressing;
		}

		WT_ERR(src->get_key(src, &key));
		dest->set_key(dest, &key);
		WT_ERR(src->get_value(src, &value));
		dest->set_value(dest, &value);
		WT_ERR(dest->insert(dest));
		if (create_bloom)
			__wt_bloom_insert(bloom, &key);
	}
	WT_ERR_NOTFOUND_OK(ret);

	WT_STAT_FAST_CONN_INCRV(session,
	    lsm_rows_merged, insert_count % LSM_MERGE_CHECK_INTERVAL);
	++lsm_tree->merge_progressing;
	__wt_verbose(session, WT_VERB_LSM,
	    "Bloom size for %" PRIu64 " has %" PRIu64 " items inserted",
	    record_count, insert_count);

	/*
	 * Closing and syncing the files can take a while.  Set the
	 * merge_syncing field so that compact knows it is still in
	 * progress.
	 */
	(void)__wt_atomic_add32(&lsm_tree->merge_syncing, 1);
	in_sync = true;
	/*
	 * We've successfully created the new chunk.  Now install it.  We need
	 * to ensure that the NO_CACHE flag is cleared and the bloom filter
	 * is closed (even if a step fails), so track errors but don't return
	 * until we've cleaned up.
	 */
	WT_TRET(src->close(src));
	WT_TRET(dest->close(dest));
	src = dest = NULL;

	F_CLR(session, WT_SESSION_NO_CACHE);

	/*
	 * We're doing advisory reads to fault the new trees into cache.
	 * Don't block if the cache is full: our next unit of work may be to
	 * discard some trees to free space.
	 */
	F_SET(session, WT_SESSION_NO_EVICTION);

	if (create_bloom) {
		if (ret == 0)
			WT_TRET(__wt_bloom_finalize(bloom));

		/*
		 * Read in a key to make sure the Bloom filters btree handle is
		 * open before it becomes visible to application threads.
		 * Otherwise application threads will stall while it is opened
		 * and internal pages are read into cache.
		 */
		if (ret == 0) {
			WT_CLEAR(key);
			WT_TRET_NOTFOUND_OK(__wt_bloom_get(bloom, &key));
		}

		WT_TRET(__wt_bloom_close(bloom));
		bloom = NULL;
	}
	WT_ERR(ret);

	/*
	 * Open a handle on the new chunk before application threads attempt
	 * to access it, opening it pre-loads internal pages into the file
	 * system cache.
	 */
	cfg[1] = "checkpoint=" WT_CHECKPOINT;
	WT_ERR(__wt_open_cursor(session, chunk->uri, NULL, cfg, &dest));
	WT_TRET(dest->close(dest));
	dest = NULL;
	++lsm_tree->merge_progressing;
	(void)__wt_atomic_sub32(&lsm_tree->merge_syncing, 1);
	in_sync = false;
	WT_ERR_NOTFOUND_OK(ret);

	WT_ERR(__wt_lsm_tree_set_chunk_size(session, chunk));
	__wt_lsm_tree_writelock(session, lsm_tree);
	locked = true;

	/*
	 * Check whether we raced with another merge, and adjust the chunk
	 * array offset as necessary.
	 */
	if (start_chunk >= lsm_tree->nchunks ||
	    lsm_tree->chunk[start_chunk]->id != start_id)
		for (start_chunk = 0;
		    start_chunk < lsm_tree->nchunks;
		    start_chunk++)
			if (lsm_tree->chunk[start_chunk]->id == start_id)
				break;

	/*
	 * It is safe to error out here - since the update can only fail
	 * prior to making updates to the tree.
	 */
	WT_ERR(__wt_lsm_merge_update_tree(
	    session, lsm_tree, start_chunk, nchunks, chunk));

	if (create_bloom)
		F_SET(chunk, WT_LSM_CHUNK_BLOOM);
	chunk->count = insert_count;
	chunk->generation = generation;
	F_SET(chunk, WT_LSM_CHUNK_ONDISK);

	/*
	 * We have no current way of continuing if the metadata update fails,
	 * so we will panic in that case.  Put some effort into cleaning up
	 * after ourselves here - so things have a chance of shutting down.
	 *
	 * Any errors that happened after the tree was locked are
	 * fatal - we can't guarantee the state of the tree.
	 */
	if ((ret = __wt_lsm_meta_write(session, lsm_tree)) != 0)
		WT_PANIC_ERR(session, ret, "Failed finalizing LSM merge");

	lsm_tree->dsk_gen++;

	/* Update the throttling while holding the tree lock. */
	__wt_lsm_tree_throttle(session, lsm_tree, true);

	/* Schedule a pass to discard old chunks */
	WT_ERR(__wt_lsm_manager_push_entry(
	    session, WT_LSM_WORK_DROP, 0, lsm_tree));

err:	if (locked)
		__wt_lsm_tree_writeunlock(session, lsm_tree);
	if (in_sync)
		(void)__wt_atomic_sub32(&lsm_tree->merge_syncing, 1);
	if (src != NULL)
		WT_TRET(src->close(src));
	if (dest != NULL)
		WT_TRET(dest->close(dest));
	if (bloom != NULL)
		WT_TRET(__wt_bloom_close(bloom));
	if (ret != 0 && created_chunk) {
		/* Drop the newly-created files on error. */
		if (chunk->uri != NULL) {
			WT_WITH_SCHEMA_LOCK(session, tret,
			    tret = __wt_schema_drop(
			    session, chunk->uri, drop_cfg));
			WT_TRET(tret);
		}
		if (create_bloom && chunk->bloom_uri != NULL) {
			WT_WITH_SCHEMA_LOCK(session, tret,
			    tret = __wt_schema_drop(
			    session, chunk->bloom_uri, drop_cfg));
			WT_TRET(tret);
		}
		__wt_free(session, chunk->bloom_uri);
		__wt_free(session, chunk->uri);
		__wt_free(session, chunk);

		if (ret == EINTR)
			__wt_verbose(session, WT_VERB_LSM,
			    "Merge aborted due to close");
		else
			__wt_verbose(session, WT_VERB_LSM,
			    "Merge failed with %s",
			   __wt_strerror(session, ret, NULL, 0));
	}
	F_CLR(session, WT_SESSION_NO_CACHE | WT_SESSION_NO_EVICTION);
	return (ret);
}