/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "$Id$" /*====== This file is part of PerconaFT. Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with PerconaFT. If not, see . ---------------------------------------- PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License, version 3, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with PerconaFT. If not, see . ======= */ #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." #include "portability/toku_atomic.h" #include "ft/cachetable/cachetable.h" #include "ft/ft.h" #include "ft/ft-internal.h" #include "ft/node.h" #include "ft/logger/log-internal.h" #include "ft/txn/rollback.h" #include "ft/serialize/block_allocator.h" #include "ft/serialize/block_table.h" #include "ft/serialize/compress.h" #include "ft/serialize/ft_node-serialize.h" #include "ft/serialize/sub_block.h" #include "util/sort.h" #include "util/threadpool.h" #include "util/status.h" #include "util/scoped_malloc.h" static FT_UPGRADE_STATUS_S ft_upgrade_status; #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc) static void status_init(void) { // Note, this function initializes the keyname, type, and legend fields. // Value fields are initialized to zero by compiler. STATUS_INIT(FT_UPGRADE_FOOTPRINT, nullptr, UINT64, "footprint", TOKU_ENGINE_STATUS); ft_upgrade_status.initialized = true; } #undef STATUS_INIT #define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num void toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) { if (!ft_upgrade_status.initialized) { status_init(); } UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint(); *s = ft_upgrade_status; } static int num_cores = 0; // cache the number of cores for the parallelization static struct toku_thread_pool *ft_pool = NULL; bool toku_serialize_in_parallel; int get_num_cores(void) { return num_cores; } struct toku_thread_pool *get_ft_pool(void) { return ft_pool; } void toku_serialize_set_parallel(bool in_parallel) { toku_unsafe_set(&toku_serialize_in_parallel, in_parallel); } void toku_ft_serialize_layer_init(void) { num_cores = toku_os_get_number_active_processors(); int r = toku_thread_pool_create(&ft_pool, num_cores); lazy_assert_zero(r); toku_serialize_in_parallel = false; } void toku_ft_serialize_layer_destroy(void) { toku_thread_pool_destroy(&ft_pool); } enum { FILE_CHANGE_INCREMENT = (16 << 20) }; static inline uint64_t alignup64(uint64_t a, uint64_t b) { return ((a+b-1)/b)*b; } // safe_file_size_lock must be held. void toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep) // Effect: If file size >= SIZE+32MiB, reduce file size. // (32 instead of 16.. hysteresis). // Return 0 on success, otherwise an error number. { int64_t file_size; { int r = toku_os_get_file_size(fd, &file_size); lazy_assert_zero(r); invariant(file_size >= 0); } invariant(expected_size == (uint64_t)file_size); // If file space is overallocated by at least 32M if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) { toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used. invariant(new_size < file_size); invariant(new_size >= 0); int r = ftruncate(fd, new_size); lazy_assert_zero(r); *new_sizep = new_size; } else { *new_sizep = file_size; } return; } static int64_t min64(int64_t a, int64_t b) { if (a= 0); invariant(expected_size == file_size); // We want to double the size of the file, or add 16MiB, whichever is less. // We emulate calling this function repeatedly until it satisfies the request. int64_t to_write = 0; if (file_size == 0) { // Prevent infinite loop by starting with stripe_width as a base case. to_write = stripe_width; } while (file_size + to_write < size) { to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width); } if (to_write > 0) { assert(to_write%512==0); toku::scoped_malloc_aligned wbuf_aligned(to_write, 512); char *wbuf = reinterpret_cast(wbuf_aligned.get()); memset(wbuf, 0, to_write); toku_off_t start_write = alignup64(file_size, stripe_width); invariant(start_write >= file_size); toku_os_full_pwrite(fd, wbuf, to_write, start_write); *new_size = start_write + to_write; } else { *new_size = file_size; } } // Don't include the sub_block header // Overhead calculated in same order fields are written to wbuf enum { node_header_overhead = (8+ // magic "tokunode" or "tokuleaf" or "tokuroll" 4+ // layout_version 4+ // layout_version_original 4), // build_id }; // uncompressed header offsets enum { uncompressed_magic_offset = 0, uncompressed_version_offset = 8, }; static uint32_t serialize_node_header_size(FTNODE node) { uint32_t retval = 0; retval += 8; // magic retval += sizeof(node->layout_version); retval += sizeof(node->layout_version_original); retval += 4; // BUILD_ID retval += 4; // n_children retval += node->n_children*8; // encode start offset and length of each partition retval += 4; // checksum return retval; } static void serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) { if (node->height == 0) wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8); else wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8); paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION); wbuf_nocrc_int(wbuf, node->layout_version); wbuf_nocrc_int(wbuf, node->layout_version_original); wbuf_nocrc_uint(wbuf, BUILD_ID); wbuf_nocrc_int (wbuf, node->n_children); for (int i=0; in_children; i++) { assert(BP_SIZE(ndd,i)>0); wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i)); // and the size } // checksum the header uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf)); wbuf_nocrc_int(wbuf, end_to_end_checksum); invariant(wbuf->ndone == wbuf->size); } static uint32_t serialize_ftnode_partition_size (FTNODE node, int i) { uint32_t result = 0; paranoid_invariant(node->bp[i].state == PT_AVAIL); result++; // Byte that states what the partition is if (node->height > 0) { NONLEAF_CHILDINFO bnc = BNC(node, i); // number of messages (4 bytes) plus size of the buffer result += (4 + toku_bnc_nbytesinbuf(bnc)); // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree result += (4 + (4 * bnc->fresh_message_tree.size())); result += (4 + (4 * bnc->stale_message_tree.size())); result += (4 + (4 * bnc->broadcast_list.size())); } else { result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header result += BLB_NBYTESINDATA(node, i); } result += 4; // checksum return result; } #define FTNODE_PARTITION_DMT_LEAVES 0xaa #define FTNODE_PARTITION_MSG_BUFFER 0xbb UU() static int assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) { bool is_fresh = msg_buffer->get_freshness(offset); assert(is_fresh); return 0; } UU() static int assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) { bool is_fresh = msg_buffer->get_freshness(offset); assert(!is_fresh); return 0; } static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) { #ifdef TOKU_DEBUG_PARANOID bnc->fresh_message_tree.iterate(&bnc->msg_buffer); bnc->stale_message_tree.iterate(&bnc->msg_buffer); #endif } static int wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) { wbuf_nocrc_int(wb, offset); return 0; } static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) { unsigned char ch = FTNODE_PARTITION_MSG_BUFFER; wbuf_nocrc_char(wb, ch); // serialize the message buffer bnc->msg_buffer.serialize_to_wbuf(wb); // serialize the message trees (num entries, offsets array): // first, verify their contents are consistent with the message buffer bnc_verify_message_trees(bnc); // fresh wbuf_nocrc_int(wb, bnc->fresh_message_tree.size()); bnc->fresh_message_tree.iterate(wb); // stale wbuf_nocrc_int(wb, bnc->stale_message_tree.size()); bnc->stale_message_tree.iterate(wb); // broadcast wbuf_nocrc_int(wb, bnc->broadcast_list.size()); bnc->broadcast_list.iterate(wb); } // // Serialize the i'th partition of node into sb // For leaf nodes, this would be the i'th basement node // For internal nodes, this would be the i'th internal node // static void serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) { // Caller should have allocated memory. invariant_notnull(sb->uncompressed_ptr); invariant(sb->uncompressed_size > 0); paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i)); // // Now put the data into sb->uncompressed_ptr // struct wbuf wb; wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); if (node->height > 0) { // TODO: (Zardosht) possibly exit early if there are no messages serialize_child_buffer(BNC(node, i), &wb); } else { unsigned char ch = FTNODE_PARTITION_DMT_LEAVES; bn_data* bd = BLB_DATA(node, i); wbuf_nocrc_char(&wb, ch); wbuf_nocrc_uint(&wb, bd->num_klpairs()); bd->serialize_to_wbuf(&wb); } uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb)); wbuf_nocrc_int(&wb, end_to_end_checksum); invariant(wb.ndone == wb.size); invariant(sb->uncompressed_size==wb.ndone); } // // Takes the data in sb->uncompressed_ptr, and compresses it // into a newly allocated buffer sb->compressed_ptr // static void compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) { invariant(sb->compressed_ptr != nullptr); invariant(sb->compressed_size_bound > 0); paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size)); // // This probably seems a bit complicated. Here is what is going on. // In PerconaFT 5.0, sub_blocks were compressed and the compressed data // was checksummed. The checksum did NOT include the size of the compressed data // and the size of the uncompressed data. The fields of sub_block only reference the // compressed data, and it is the responsibility of the user of the sub_block // to write the length // // For Dr. No, we want the checksum to also include the size of the compressed data, and the // size of the decompressed data, because this data // may be read off of disk alone, so it must be verifiable alone. // // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum. // sb->compressed_size = compress_nocrc_sub_block( sb, (char *)sb->compressed_ptr + 8, sb->compressed_size_bound, method ); uint32_t* extra = (uint32_t *)(sb->compressed_ptr); // store the compressed and uncompressed size at the beginning extra[0] = toku_htod32(sb->compressed_size); extra[1] = toku_htod32(sb->uncompressed_size); // now checksum the entire thing sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size); // // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains // two integers at the beginning, the size and uncompressed size, and then the compressed // data. sb->xsum contains the checksum of this entire thing. // // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum // checksummed only the compressed data, and the checksumming of the sizes were not // done here. // } // // Returns the size needed to serialize the ftnode info // Does not include header information that is common with rollback logs // such as the magic, layout_version, and build_id // Includes only node specific info such as pivot information, n_children, and so on // static uint32_t serialize_ftnode_info_size(FTNODE node) { uint32_t retval = 0; retval += 8; // max_msn_applied_to_node_on_disk retval += 4; // nodesize retval += 4; // flags retval += 4; // height; retval += 8; // oldest_referenced_xid_known retval += node->pivotkeys.serialized_size(); retval += (node->n_children-1)*4; // encode length of each pivot if (node->height > 0) { retval += node->n_children*8; // child blocknum's } retval += 4; // checksum return retval; } static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) { // Memory must have been allocated by our caller. invariant(sb->uncompressed_size > 0); invariant_notnull(sb->uncompressed_ptr); paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node)); struct wbuf wb; wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk); wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be wbuf_nocrc_uint(&wb, node->flags); wbuf_nocrc_int (&wb, node->height); wbuf_TXNID(&wb, node->oldest_referenced_xid_known); node->pivotkeys.serialize_to_wbuf(&wb); // child blocks, only for internal nodes if (node->height > 0) { for (int i = 0; i < node->n_children; i++) { wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i)); } } uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb)); wbuf_nocrc_int(&wb, end_to_end_checksum); invariant(wb.ndone == wb.size); invariant(sb->uncompressed_size==wb.ndone); } // This is the size of the uncompressed data, not including the compression headers unsigned int toku_serialize_ftnode_size (FTNODE node) { unsigned int result = 0; // // As of now, this seems to be called if and only if the entire node is supposed // to be in memory, so we will assert it. // toku_ftnode_assert_fully_in_memory(node); result += serialize_node_header_size(node); result += serialize_ftnode_info_size(node); for (int i = 0; i < node->n_children; i++) { result += serialize_ftnode_partition_size(node,i); } return result; } struct serialize_times { tokutime_t serialize_time; tokutime_t compress_time; }; static void serialize_and_compress_partition(FTNODE node, int childnum, enum toku_compression_method compression_method, SUB_BLOCK sb, struct serialize_times *st) { // serialize, compress, update status tokutime_t t0 = toku_time_now(); serialize_ftnode_partition(node, childnum, sb); tokutime_t t1 = toku_time_now(); compress_ftnode_sub_block(sb, compression_method); tokutime_t t2 = toku_time_now(); st->serialize_time += t1 - t0; st->compress_time += t2 - t1; } void toku_create_compressed_partition_from_available( FTNODE node, int childnum, enum toku_compression_method compression_method, SUB_BLOCK sb ) { tokutime_t t0 = toku_time_now(); // serialize sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum); toku::scoped_malloc uncompressed_buf(sb->uncompressed_size); sb->uncompressed_ptr = uncompressed_buf.get(); serialize_ftnode_partition(node, childnum, sb); tokutime_t t1 = toku_time_now(); // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them set_compressed_size_bound(sb, compression_method); sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound); sb->compressed_size = compress_nocrc_sub_block( sb, sb->compressed_ptr, sb->compressed_size_bound, compression_method ); sb->uncompressed_ptr = NULL; tokutime_t t2 = toku_time_now(); toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1); } static void serialize_and_compress_serially(FTNODE node, int npartitions, enum toku_compression_method compression_method, struct sub_block sb[], struct serialize_times *st) { for (int i = 0; i < npartitions; i++) { serialize_and_compress_partition(node, i, compression_method, &sb[i], st); } } struct serialize_compress_work { struct work base; FTNODE node; int i; enum toku_compression_method compression_method; struct sub_block *sb; struct serialize_times st; }; static void * serialize_and_compress_worker(void *arg) { struct workset *ws = (struct workset *) arg; while (1) { struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws); if (w == NULL) break; int i = w->i; serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st); } workset_release_ref(ws); return arg; } static void serialize_and_compress_in_parallel(FTNODE node, int npartitions, enum toku_compression_method compression_method, struct sub_block sb[], struct serialize_times *st) { if (npartitions == 1) { serialize_and_compress_partition(node, 0, compression_method, &sb[0], st); } else { int T = num_cores; if (T > npartitions) T = npartitions; if (T > 0) T = T - 1; struct workset ws; ZERO_STRUCT(ws); workset_init(&ws); struct serialize_compress_work work[npartitions]; workset_lock(&ws); for (int i = 0; i < npartitions; i++) { work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}}, .node = node, .i = i, .compression_method = compression_method, .sb = sb, .st = { .serialize_time = 0, .compress_time = 0} }; workset_put_locked(&ws, &work[i].base); } workset_unlock(&ws); toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws); workset_add_ref(&ws, T); serialize_and_compress_worker(&ws); workset_join(&ws); workset_destroy(&ws); // gather up the statistics from each thread's work item for (int i = 0; i < npartitions; i++) { st->serialize_time += work[i].st.serialize_time; st->compress_time += work[i].st.compress_time; } } } static void serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb, enum toku_compression_method compression_method, struct serialize_times *st) { // serialize, compress, update serialize times. tokutime_t t0 = toku_time_now(); serialize_ftnode_info(node, sb); tokutime_t t1 = toku_time_now(); compress_ftnode_sub_block(sb, compression_method); tokutime_t t2 = toku_time_now(); st->serialize_time += t1 - t0; st->compress_time += t2 - t1; } int toku_serialize_ftnode_to_memory(FTNODE node, FTNODE_DISK_DATA* ndd, unsigned int basementnodesize, enum toku_compression_method compression_method, bool do_rebalancing, bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false /*out*/ size_t *n_bytes_to_write, /*out*/ size_t *n_uncompressed_bytes, /*out*/ char **bytes_to_write) // Effect: Writes out each child to a separate malloc'd buffer, then compresses // all of them, and writes the uncompressed header, to bytes_to_write, // which is malloc'd. // // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed). // 512-byte padding is for O_DIRECT to work. { toku_ftnode_assert_fully_in_memory(node); if (do_rebalancing && node->height == 0) { toku_ftnode_leaf_rebalance(node, basementnodesize); } const int npartitions = node->n_children; // Each partition represents a compressed sub block // For internal nodes, a sub block is a message buffer // For leaf nodes, a sub block is a basement node toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions); struct sub_block *sb = reinterpret_cast(sb_buf.get()); XREALLOC_N(npartitions, *ndd); // // First, let's serialize and compress the individual sub blocks // // determine how large our serialization and compression buffers need to be. size_t serialize_buf_size = 0, compression_buf_size = 0; for (int i = 0; i < node->n_children; i++) { sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i); sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size); serialize_buf_size += sb[i].uncompressed_size; compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size } // give each sub block a base pointer to enough buffer space for serialization and compression toku::scoped_malloc serialize_buf(serialize_buf_size); toku::scoped_malloc compression_buf(compression_buf_size); for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) { sb[i].uncompressed_ptr = reinterpret_cast(serialize_buf.get()) + uncompressed_offset; sb[i].compressed_ptr = reinterpret_cast(compression_buf.get()) + compressed_offset; uncompressed_offset += sb[i].uncompressed_size; compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size invariant(uncompressed_offset <= serialize_buf_size); invariant(compressed_offset <= compression_buf_size); } // do the actual serialization now that we have buffer space struct serialize_times st = { 0, 0 }; if (in_parallel) { serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st); } else { serialize_and_compress_serially(node, npartitions, compression_method, sb, &st); } // // Now lets create a sub-block that has the common node information, // This does NOT include the header // // determine how large our serialization and copmression buffers need to be struct sub_block sb_node_info; sub_block_init(&sb_node_info); size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node); size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size); toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size); toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size sb_node_info.uncompressed_size = sb_node_info_uncompressed_size; sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get(); sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound; sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get(); // do the actual serialization now that we have buffer space serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st); // // At this point, we have compressed each of our pieces into individual sub_blocks, // we can put the header and all the subblocks into a single buffer and return it. // // update the serialize times, ignore the header for simplicity. we captured all // of the partitions' serialize times so that's probably good enough. toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time); // The total size of the node is: // size of header + disk size of the n+1 sub_block's created above uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header + sb_node_info.compressed_size // compressed nodeinfo (without its checksum) + 4); // nodeinfo's checksum uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum) + 4); // nodeinfo's checksum // store the BP_SIZESs for (int i = 0; i < node->n_children; i++) { uint32_t len = sb[i].compressed_size + 4; // data and checksum BP_SIZE (*ndd,i) = len; BP_START(*ndd,i) = total_node_size; total_node_size += sb[i].compressed_size + 4; total_uncompressed_size += sb[i].uncompressed_size + 4; } // now create the final serialized node uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. char *XMALLOC_N_ALIGNED(512, total_buffer_size, data); char *curr_ptr = data; // write the header struct wbuf wb; wbuf_init(&wb, curr_ptr, serialize_node_header_size(node)); serialize_node_header(node, *ndd, &wb); assert(wb.ndone == wb.size); curr_ptr += serialize_node_header_size(node); // now write sb_node_info memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size); curr_ptr += sb_node_info.compressed_size; // write the checksum *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum); curr_ptr += sizeof(sb_node_info.xsum); for (int i = 0; i < npartitions; i++) { memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size); curr_ptr += sb[i].compressed_size; // write the checksum *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum); curr_ptr += sizeof(sb[i].xsum); } // Zero the rest of the buffer memset(data + total_node_size, 0, total_buffer_size - total_node_size); assert((uint32_t) (curr_ptr - data) == total_node_size); *bytes_to_write = data; *n_bytes_to_write = total_buffer_size; *n_uncompressed_bytes = total_uncompressed_size; invariant(*n_bytes_to_write % 512 == 0); invariant(reinterpret_cast(*bytes_to_write) % 512 == 0); return 0; } int toku_serialize_ftnode_to(int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA *ndd, bool do_rebalancing, FT ft, bool for_checkpoint) { size_t n_to_write; size_t n_uncompressed_bytes; char *compressed_buf = nullptr; // because toku_serialize_ftnode_to is only called for // in toku_ftnode_flush_callback, we pass false // for in_parallel. The reasoning is that when we write // nodes to disk via toku_ftnode_flush_callback, we // assume that it is being done on a non-critical // background thread (probably for checkpointing), and therefore // should not hog CPU, // // Should the above facts change, we may want to revisit // passing false for in_parallel here // // alternatively, we could have made in_parallel a parameter // for toku_serialize_ftnode_to, but instead we did this. int r = toku_serialize_ftnode_to_memory( node, ndd, ft->h->basementnodesize, ft->h->compression_method, do_rebalancing, toku_unsafe_fetch(&toku_serialize_in_parallel), &n_to_write, &n_uncompressed_bytes, &compressed_buf); if (r != 0) { return r; } // If the node has never been written, then write the whole buffer, // including the zeros invariant(blocknum.b >= 0); DISKOFF offset; // Dirties the ft ft->blocktable.realloc_on_disk( blocknum, n_to_write, &offset, ft, fd, for_checkpoint); tokutime_t t0 = toku_time_now(); toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset); tokutime_t t1 = toku_time_now(); tokutime_t io_time = t1 - t0; toku_ft_status_update_flush_reason( node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint); toku_free(compressed_buf); node->dirty = 0; // See #1957. Must set the node to be clean after // serializing it so that it doesn't get written again on // the next checkpoint or eviction. if (node->height == 0) { for (int i = 0; i < node->n_children; i++) { if (BP_STATE(node, i) == PT_AVAIL) { BLB_LRD(node, i) = 0; } } } return 0; } static void sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc, const toku::comparator &cmp, int32_t **fresh_offsets, int32_t nfresh, int32_t **stale_offsets, int32_t nstale, int32_t **broadcast_offsets, int32_t nbroadcast) { // We always have fresh / broadcast offsets (even if they are empty) // but we may not have stale offsets, in the case of v13 upgrade. invariant(fresh_offsets != nullptr); invariant(broadcast_offsets != nullptr); invariant(cmp.valid()); typedef toku::sort msn_sort; const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast; struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer); msn_sort::mergesort_r(*fresh_offsets, nfresh, extra); bnc->fresh_message_tree.destroy(); bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer); if (stale_offsets) { msn_sort::mergesort_r(*stale_offsets, nstale, extra); bnc->stale_message_tree.destroy(); bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer); } bnc->broadcast_list.destroy(); bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer); } static MSN deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) { // We skip 'stale' offsets for upgraded nodes. int32_t nfresh = 0, nbroadcast = 0; int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr; // Only sort buffers if we have a valid comparison function. In certain scenarios, // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes // for simple inspection and don't actually require that the message buffers are // properly sorted. This is very ugly, but correct. const bool sort = ft->cmp.valid(); MSN highest_msn_in_this_buffer = bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade, sort ? &fresh_offsets : nullptr, &nfresh, sort ? &broadcast_offsets : nullptr, &nbroadcast); if (sort) { sort_and_steal_offset_arrays(bnc, ft->cmp, &fresh_offsets, nfresh, nullptr, 0, // no stale offsets &broadcast_offsets, nbroadcast); } return highest_msn_in_this_buffer; } static void deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) { int32_t nfresh = 0, nstale = 0, nbroadcast = 0; int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets; // Only sort buffers if we have a valid comparison function. In certain scenarios, // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes // for simple inspection and don't actually require that the message buffers are // properly sorted. This is very ugly, but correct. const bool sort = cmp.valid(); // read in the message buffer bnc->msg_buffer.deserialize_from_rbuf(rb, sort ? &fresh_offsets : nullptr, &nfresh, sort ? &stale_offsets : nullptr, &nstale, sort ? &broadcast_offsets : nullptr, &nbroadcast); if (sort) { sort_and_steal_offset_arrays(bnc, cmp, &fresh_offsets, nfresh, &stale_offsets, nstale, &broadcast_offsets, nbroadcast); } } static void deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) { // read in the message buffer bnc->msg_buffer.deserialize_from_rbuf(rb, nullptr, nullptr, // fresh_offsets, nfresh, nullptr, nullptr, // stale_offsets, nstale, nullptr, nullptr); // broadcast_offsets, nbroadcast // read in each message tree (fresh, stale, broadcast) int32_t nfresh = rbuf_int(rb); int32_t *XMALLOC_N(nfresh, fresh_offsets); for (int i = 0; i < nfresh; i++) { fresh_offsets[i] = rbuf_int(rb); } int32_t nstale = rbuf_int(rb); int32_t *XMALLOC_N(nstale, stale_offsets); for (int i = 0; i < nstale; i++) { stale_offsets[i] = rbuf_int(rb); } int32_t nbroadcast = rbuf_int(rb); int32_t *XMALLOC_N(nbroadcast, broadcast_offsets); for (int i = 0; i < nbroadcast; i++) { broadcast_offsets[i] = rbuf_int(rb); } // build OMTs out of each offset array bnc->fresh_message_tree.destroy(); bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh); bnc->stale_message_tree.destroy(); bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale); bnc->broadcast_list.destroy(); bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast); } // dump a buffer to stderr // no locking around this for now void dump_bad_block(unsigned char *vp, uint64_t size) { const uint64_t linesize = 64; uint64_t n = size / linesize; for (uint64_t i = 0; i < n; i++) { fprintf(stderr, "%p: ", vp); for (uint64_t j = 0; j < linesize; j++) { unsigned char c = vp[j]; fprintf(stderr, "%2.2X", c); } fprintf(stderr, "\n"); vp += linesize; } size = size % linesize; for (uint64_t i=0; idata_buffer.initialize_empty(); return bn; } BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) { BASEMENTNODE bn = toku_create_empty_bn_no_buffer(); bn->max_msn_applied = orig_bn->max_msn_applied; bn->seqinsert = orig_bn->seqinsert; bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied; bn->stat64_delta = orig_bn->stat64_delta; bn->logical_rows_delta = orig_bn->logical_rows_delta; bn->data_buffer.clone(&orig_bn->data_buffer); return bn; } BASEMENTNODE toku_create_empty_bn_no_buffer(void) { BASEMENTNODE XMALLOC(bn); bn->max_msn_applied.msn = 0; bn->seqinsert = 0; bn->stale_ancestor_messages_applied = false; bn->stat64_delta = ZEROSTATS; bn->logical_rows_delta = 0; bn->data_buffer.init_zero(); return bn; } NONLEAF_CHILDINFO toku_create_empty_nl(void) { NONLEAF_CHILDINFO XMALLOC(cn); cn->msg_buffer.create(); cn->fresh_message_tree.create_no_array(); cn->stale_message_tree.create_no_array(); cn->broadcast_list.create_no_array(); memset(cn->flow, 0, sizeof cn->flow); return cn; } // must clone the OMTs, since we serialize them along with the message buffer NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) { NONLEAF_CHILDINFO XMALLOC(cn); cn->msg_buffer.clone(&orig_childinfo->msg_buffer); cn->fresh_message_tree.create_no_array(); cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree); cn->stale_message_tree.create_no_array(); cn->stale_message_tree.clone(orig_childinfo->stale_message_tree); cn->broadcast_list.create_no_array(); cn->broadcast_list.clone(orig_childinfo->broadcast_list); memset(cn->flow, 0, sizeof cn->flow); return cn; } void destroy_basement_node (BASEMENTNODE bn) { bn->data_buffer.destroy(); toku_free(bn); } void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl) { nl->msg_buffer.destroy(); nl->fresh_message_tree.destroy(); nl->stale_message_tree.destroy(); nl->broadcast_list.destroy(); toku_free(nl); } void read_block_from_fd_into_rbuf( int fd, BLOCKNUM blocknum, FT ft, struct rbuf *rb ) { // get the file offset and block size for the block DISKOFF offset, size; ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); DISKOFF size_aligned = roundup_to_multiple(512, size); uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block); rbuf_init(rb, raw_block, size); // read the block ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); assert((DISKOFF)rlen >= size); assert((DISKOFF)rlen <= size_aligned); } static const int read_header_heuristic_max = 32*1024; #ifndef MIN #define MIN(a,b) (((a)>(b)) ? (b) : (a)) #endif // Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case. static void read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd, BLOCKNUM blocknum, FT ft, struct rbuf *rb, ftnode_fetch_extra *bfe) { DISKOFF offset, size; ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size)); uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block); rbuf_init(rb, raw_block, read_size); // read the block tokutime_t t0 = toku_time_now(); ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset); tokutime_t t1 = toku_time_now(); assert(rlen >= 0); rbuf_init(rb, raw_block, rlen); bfe->bytes_read = rlen; bfe->io_time = t1 - t0; toku_ft_status_update_pivot_fetch_reason(bfe); } // // read the compressed partition into the sub_block, // validate the checksum of the compressed data // int read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb) { int r = 0; sb->compressed_size = rbuf_int(rb); sb->uncompressed_size = rbuf_int(rb); const void **cp = (const void **) &sb->compressed_ptr; rbuf_literal_bytes(rb, cp, sb->compressed_size); sb->xsum = rbuf_int(rb); // let's check the checksum uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size); if (sb->xsum != actual_xsum) { r = TOKUDB_BAD_CHECKSUM; } return r; } static int read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) { int r = 0; r = read_compressed_sub_block(rb, sb); if (r != 0) { goto exit; } just_decompress_sub_block(sb); exit: return r; } // Allocates space for the sub-block and de-compresses the data from // the supplied compressed pointer.. void just_decompress_sub_block(struct sub_block *sb) { // TODO: Add assert that the subblock was read in. sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); toku_decompress( (Bytef *) sb->uncompressed_ptr, sb->uncompressed_size, (Bytef *) sb->compressed_ptr, sb->compressed_size ); } // verify the checksum int verify_ftnode_sub_block(struct sub_block *sb, const char *fname, BLOCKNUM blocknum) { int r = 0; // first verify the checksum uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size))); uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size); if (stored_xsum != actual_xsum) { fprintf( stderr, "%s:%d:verify_ftnode_sub_block - " "file[%s], blocknum[%lld], stored_xsum[%u] != actual_xsum[%u]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, stored_xsum, actual_xsum); dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size); r = TOKUDB_BAD_CHECKSUM; } return r; } // This function deserializes the data stored by serialize_ftnode_info static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) { // sb_node_info->uncompressed_ptr stores the serialized node information // this function puts that information into node // first verify the checksum int r = 0; const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); r = verify_ftnode_sub_block(sb, fname, node->blocknum); if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_info - " "file[%s], blocknum[%lld], verify_ftnode_sub_block failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, r); dump_bad_block(static_cast(sb->uncompressed_ptr), sb->uncompressed_size); goto exit; } uint32_t data_size; data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end // now with the data verified, we can read the information into the node struct rbuf rb; rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size); node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb); (void)rbuf_int(&rb); node->flags = rbuf_int(&rb); node->height = rbuf_int(&rb); if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) { (void) rbuf_int(&rb); // optimized_for_upgrade } if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) { rbuf_TXNID(&rb, &node->oldest_referenced_xid_known); } // now create the basement nodes or childinfos, depending on whether this is a // leaf node or internal node // now the subtree_estimates // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf. // now the pivots if (node->n_children > 1) { node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1); } else { node->pivotkeys.create_empty(); } // if this is an internal node, unpack the block nums, and fill in necessary fields // of childinfo if (node->height > 0) { for (int i = 0; i < node->n_children; i++) { BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb); BP_WORKDONE(node, i) = 0; } } // make sure that all the data was read if (data_size != rb.ndone) { fprintf( stderr, "%s:%d:deserialize_ftnode_info - " "file[%s], blocknum[%lld], data_size[%d] != rb.ndone[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, data_size, rb.ndone); dump_bad_block(rb.buf, rb.size); abort(); } exit: return r; } static void setup_available_ftnode_partition(FTNODE node, int i) { if (node->height == 0) { set_BLB(node, i, toku_create_empty_bn()); BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk; } else { set_BNC(node, i, toku_create_empty_nl()); } } // Assign the child_to_read member of the bfe from the given ftnode // that has been brought into memory. static void update_bfe_using_ftnode(FTNODE node, ftnode_fetch_extra *bfe) { if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) { // we do not take into account prefetching yet // as of now, if we need a subset, the only thing // we can possibly require is a single basement node // we find out what basement node the query cares about // and check if it is available bfe->child_to_read = toku_ft_search_which_child( bfe->ft->cmp, node, bfe->search ); } else if (bfe->type == ftnode_fetch_keymatch) { // we do not take into account prefetching yet // as of now, if we need a subset, the only thing // we can possibly require is a single basement node // we find out what basement node the query cares about // and check if it is available if (node->height == 0) { int left_child = bfe->leftmost_child_wanted(node); int right_child = bfe->rightmost_child_wanted(node); if (left_child == right_child) { bfe->child_to_read = left_child; } } } } // Using the search parameters in the bfe, this function will // initialize all of the given ftnode's partitions. static void setup_partitions_using_bfe(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory) { // Leftmost and Rightmost Child bounds. int lc, rc; if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) { lc = bfe->leftmost_child_wanted(node); rc = bfe->rightmost_child_wanted(node); } else { lc = -1; rc = -1; } // // setup memory needed for the node // //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc); for (int i = 0; i < node->n_children; i++) { BP_INIT_UNTOUCHED_CLOCK(node,i); if (data_in_memory) { BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc)) ? PT_AVAIL : PT_COMPRESSED); } else { BP_STATE(node, i) = PT_ON_DISK; } BP_WORKDONE(node,i) = 0; switch (BP_STATE(node,i)) { case PT_AVAIL: setup_available_ftnode_partition(node, i); BP_TOUCH_CLOCK(node,i); break; case PT_COMPRESSED: set_BSB(node, i, sub_block_creat()); break; case PT_ON_DISK: set_BNULL(node, i); break; case PT_INVALID: abort(); } } } static void setup_ftnode_partitions(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory) // Effect: Used when reading a ftnode into main memory, this sets up the partitions. // We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations). // Arguments: Node: the node to set up. // bfe: Describes the key range needed. // data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe. // false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK. { // Set bfe->child_to_read. update_bfe_using_ftnode(node, bfe); // Setup the partitions. setup_partitions_using_bfe(node, bfe, data_in_memory); } /* deserialize the partition from the sub-block's uncompressed buffer * and destroy the uncompressed buffer */ static int deserialize_ftnode_partition( struct sub_block *sb, FTNODE node, int childnum, // which partition to deserialize const toku::comparator &cmp) { int r = 0; const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); r = verify_ftnode_sub_block(sb, fname, node->blocknum); if (r != 0) { fprintf(stderr, "%s:%d:deserialize_ftnode_partition - " "file[%s], blocknum[%lld], " "verify_ftnode_sub_block failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, r); goto exit; } uint32_t data_size; data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end // now with the data verified, we can read the information into the node struct rbuf rb; rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size); unsigned char ch; ch = rbuf_char(&rb); if (node->height > 0) { if (ch != FTNODE_PARTITION_MSG_BUFFER) { fprintf(stderr, "%s:%d:deserialize_ftnode_partition - " "file[%s], blocknum[%lld], ch[%d] != " "FTNODE_PARTITION_MSG_BUFFER[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, ch, FTNODE_PARTITION_MSG_BUFFER); dump_bad_block(rb.buf, rb.size); assert(ch == FTNODE_PARTITION_MSG_BUFFER); } NONLEAF_CHILDINFO bnc = BNC(node, childnum); if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) { // Layout version <= 26 did not serialize sorted message trees to disk. deserialize_child_buffer_v26(bnc, &rb, cmp); } else { deserialize_child_buffer(bnc, &rb); } BP_WORKDONE(node, childnum) = 0; } else { if (ch != FTNODE_PARTITION_DMT_LEAVES) { fprintf(stderr, "%s:%d:deserialize_ftnode_partition - " "file[%s], blocknum[%lld], ch[%d] != " "FTNODE_PARTITION_DMT_LEAVES[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, ch, FTNODE_PARTITION_DMT_LEAVES); dump_bad_block(rb.buf, rb.size); assert(ch == FTNODE_PARTITION_DMT_LEAVES); } BLB_SEQINSERT(node, childnum) = 0; uint32_t num_entries = rbuf_int(&rb); // we are now at the first byte of first leafentry data_size -= rb.ndone; // remaining bytes of leafentry data BASEMENTNODE bn = BLB(node, childnum); bn->data_buffer.deserialize_from_rbuf( num_entries, &rb, data_size, node->layout_version_read_from_disk); } if (rb.ndone != rb.size) { fprintf(stderr, "%s:%d:deserialize_ftnode_partition - " "file[%s], blocknum[%lld], rb.ndone[%d] != rb.size[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, rb.ndone, rb.size); dump_bad_block(rb.buf, rb.size); assert(rb.ndone == rb.size); } exit: return r; } static int decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, FTNODE node, int child, const toku::comparator &cmp, tokutime_t *decompress_time) { int r = 0; tokutime_t t0 = toku_time_now(); r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb); if (r != 0) { const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); fprintf(stderr, "%s:%d:decompress_and_deserialize_worker - " "file[%s], blocknum[%lld], read_and_decompress_sub_block failed " "with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, r); dump_bad_block(curr_rbuf.buf, curr_rbuf.size); goto exit; } *decompress_time = toku_time_now() - t0; // at this point, sb->uncompressed_ptr stores the serialized node partition r = deserialize_ftnode_partition(&curr_sb, node, child, cmp); if (r != 0) { const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); fprintf(stderr, "%s:%d:decompress_and_deserialize_worker - " "file[%s], blocknum[%lld], deserialize_ftnode_partition failed " "with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, r); dump_bad_block(curr_rbuf.buf, curr_rbuf.size); goto exit; } exit: toku_free(curr_sb.uncompressed_ptr); return r; } static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, FTNODE node, int child) { int r = 0; r = read_compressed_sub_block(&curr_rbuf, &curr_sb); if (r != 0) { goto exit; } SUB_BLOCK bp_sb; bp_sb = BSB(node, child); bp_sb->compressed_size = curr_sb.compressed_size; bp_sb->uncompressed_size = curr_sb.uncompressed_size; bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size); memcpy( bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size); exit: return r; } static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) { // Effect: Allocate an FTNODE and fill in the values that are not read from FTNODE XMALLOC(node); node->fullhash = fullhash; node->blocknum = blocknum; node->dirty = 0; node->oldest_referenced_xid_known = TXNID_NONE; node->bp = nullptr; node->ct_pair = nullptr; return node; } static int deserialize_ftnode_header_from_rbuf_if_small_enough( FTNODE *ftnode, FTNODE_DISK_DATA *ndd, BLOCKNUM blocknum, uint32_t fullhash, ftnode_fetch_extra *bfe, struct rbuf *rb, int fd) // If we have enough information in the rbuf to construct a header, then do so. // Also fetch in the basement node if needed. // Return 0 if it worked. If something goes wrong (including that we are // looking at some old data format that doesn't have partitions) then return // nonzero. { int r = 0; tokutime_t t0, t1; tokutime_t decompress_time = 0; tokutime_t deserialize_time = 0; // we must get the name from bfe and not through // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); t0 = toku_time_now(); FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum); if (rb->size < 24) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], rb->size[%u] < 24\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, rb->size); dump_bad_block(rb->buf, rb->size); // TODO: What error do we return here? // Does it even matter? r = toku_db_badformat(); goto cleanup; } const void *magic; rbuf_literal_bytes(rb, &magic, 8); if (memcmp(magic, "tokuleaf", 8) != 0 && memcmp(magic, "tokunode", 8) != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], unrecognized magic number " "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, static_cast(magic)[0], static_cast(magic)[1], static_cast(magic)[2], static_cast(magic)[3], static_cast(magic)[4], static_cast(magic)[5], static_cast(magic)[6], static_cast(magic)[7]); dump_bad_block(rb->buf, rb->size); r = toku_db_badformat(); goto cleanup; } node->layout_version_read_from_disk = rbuf_int(rb); if (node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], node->layout_version_read_from_disk[%d] " "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, node->layout_version_read_from_disk, FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES); dump_bad_block(rb->buf, rb->size); // This code path doesn't have to worry about upgrade. r = toku_db_badformat(); goto cleanup; } // If we get here, we know the node is at least // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES. We haven't changed // the serialization format since then (this comment is correct as of // version 20, which is Deadshot) so we can go ahead and say the // layout version is current (it will be as soon as we finish // deserializing). // TODO(leif): remove node->layout_version (#5174) node->layout_version = FT_LAYOUT_VERSION; node->layout_version_original = rbuf_int(rb); node->build_id = rbuf_int(rb); node->n_children = rbuf_int(rb); // Guaranteed to be have been able to read up to here. If n_children // is too big, we may have a problem, so check that we won't overflow // while reading the partition locations. unsigned int nhsize; // we can do this because n_children is filled in. nhsize = serialize_node_header_size(node); unsigned int needed_size; // we need 12 more so that we can read the compressed block size information // that follows for the nodeinfo. needed_size = nhsize + 12; if (needed_size > rb->size) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], needed_size[%d] > rb->size[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, needed_size, rb->size); dump_bad_block(rb->buf, rb->size); r = toku_db_badformat(); goto cleanup; } XMALLOC_N(node->n_children, node->bp); XMALLOC_N(node->n_children, *ndd); // read the partition locations for (int i=0; in_children; i++) { BP_START(*ndd,i) = rbuf_int(rb); BP_SIZE (*ndd,i) = rbuf_int(rb); } uint32_t checksum; checksum = toku_x1764_memory(rb->buf, rb->ndone); uint32_t stored_checksum; stored_checksum = rbuf_int(rb); if (stored_checksum != checksum) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, stored_checksum, checksum); dump_bad_block(rb->buf, rb->size); r = TOKUDB_BAD_CHECKSUM; goto cleanup; } // Now we want to read the pivot information. struct sub_block sb_node_info; sub_block_init(&sb_node_info); // we'll be able to read these because we checked the size earlier. sb_node_info.compressed_size = rbuf_int(rb); sb_node_info.uncompressed_size = rbuf_int(rb); if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], rb->size[%d] - rb->ndone[%d] < " "sb_node_info.compressed_size[%d] + 8\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, rb->size, rb->ndone, sb_node_info.compressed_size); dump_bad_block(rb->buf, rb->size); r = toku_db_badformat(); goto cleanup; } // Finish reading compressed the sub_block const void **cp; cp = (const void **) &sb_node_info.compressed_ptr; rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size); sb_node_info.xsum = rbuf_int(rb); // let's check the checksum uint32_t actual_xsum; actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8, 8 + sb_node_info.compressed_size); if (sb_node_info.xsum != actual_xsum) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], sb_node_info.xsum[%d] != actual_xsum[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, sb_node_info.xsum, actual_xsum); dump_bad_block(rb->buf, rb->size); r = TOKUDB_BAD_CHECKSUM; goto cleanup; } // Now decompress the subblock { toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size); sb_node_info.uncompressed_ptr = sb_node_info_buf.get(); tokutime_t decompress_t0 = toku_time_now(); toku_decompress((Bytef *)sb_node_info.uncompressed_ptr, sb_node_info.uncompressed_size, (Bytef *)sb_node_info.compressed_ptr, sb_node_info.compressed_size); tokutime_t decompress_t1 = toku_time_now(); decompress_time = decompress_t1 - decompress_t0; // at this point sb->uncompressed_ptr stores the serialized node info. r = deserialize_ftnode_info(&sb_node_info, node); if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], deserialize_ftnode_info failed with " "%d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block( static_cast(sb_node_info.uncompressed_ptr), sb_node_info.uncompressed_size); dump_bad_block(rb->buf, rb->size); goto cleanup; } } // Now we have the ftnode_info. We have a bunch more stuff in the // rbuf, so we might be able to store the compressed data for some // objects. // We can proceed to deserialize the individual subblocks. // setup the memory of the partitions // for partitions being decompressed, create either message buffer or basement node // for partitions staying compressed, create sub_block setup_ftnode_partitions(node, bfe, false); // We must capture deserialize and decompression time before // the pf_callback, otherwise we would double-count. t1 = toku_time_now(); deserialize_time = (t1 - t0) - decompress_time; // do partial fetch if necessary if (bfe->type != ftnode_fetch_none) { PAIR_ATTR attr; r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr); if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " "file[%s], blocknum[%lld], toku_ftnode_pf_callback failed with " "%d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block(rb->buf, rb->size); goto cleanup; } } // handle clock for (int i = 0; i < node->n_children; i++) { if (bfe->wants_child_available(i)) { paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); BP_TOUCH_CLOCK(node,i); } } *ftnode = node; r = 0; cleanup: if (r == 0) { bfe->deserialize_time += deserialize_time; bfe->decompress_time += decompress_time; toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); } if (r != 0) { if (node) { toku_free(*ndd); toku_free(node->bp); toku_free(node); } } return r; } // This function takes a deserialized version 13 or 14 buffer and // constructs the associated internal, non-leaf ftnode object. It // also creates MSN's for older messages created in older versions // that did not generate MSN's for messages. These new MSN's are // generated from the root downwards, counting backwards from MIN_MSN // and persisted in the ft header. static int deserialize_and_upgrade_internal_node(FTNODE node, struct rbuf *rb, ftnode_fetch_extra *bfe, STAT64INFO info) { int version = node->layout_version_read_from_disk; if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) { (void) rbuf_int(rb); // 10. fingerprint } node->n_children = rbuf_int(rb); // 11. n_children // Sub-tree esitmates... for (int i = 0; i < node->n_children; ++i) { if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) { (void) rbuf_int(rb); // 12. fingerprint } uint64_t nkeys = rbuf_ulonglong(rb); // 13. nkeys uint64_t ndata = rbuf_ulonglong(rb); // 14. ndata uint64_t dsize = rbuf_ulonglong(rb); // 15. dsize (void) rbuf_char(rb); // 16. exact (char) invariant(nkeys == ndata); if (info) { // info is non-null if we're trying to upgrade old subtree // estimates to stat64info info->numrows += nkeys; info->numbytes += dsize; } } // Pivot keys node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1); // Create space for the child node buffers (a.k.a. partitions). XMALLOC_N(node->n_children, node->bp); // Set the child blocknums. for (int i = 0; i < node->n_children; ++i) { BP_BLOCKNUM(node, i) = rbuf_blocknum(rb); // 18. blocknums BP_WORKDONE(node, i) = 0; } // Read in the child buffer maps. for (int i = 0; i < node->n_children; ++i) { // The following fields were previously used by the `sub_block_map' // They include: // - 4 byte index (void) rbuf_int(rb); // - 4 byte offset (void) rbuf_int(rb); // - 4 byte size (void) rbuf_int(rb); } // We need to setup this node's partitions, but we can't call the // existing call (setup_ftnode_paritions.) because there are // existing optimizations that would prevent us from bringing all // of this node's partitions into memory. Instead, We use the // existing bfe and node to set the bfe's child_to_search member. // Then we create a temporary bfe that needs all the nodes to make // sure we properly intitialize our partitions before filling them // in from our soon-to-be-upgraded node. update_bfe_using_ftnode(node, bfe); ftnode_fetch_extra temp_bfe; temp_bfe.create_for_full_read(nullptr); setup_partitions_using_bfe(node, &temp_bfe, true); // Cache the highest MSN generated for the message buffers. This // will be set in the ftnode. // // The way we choose MSNs for upgraded messages is delicate. The // field `highest_unused_msn_for_upgrade' in the header is always an // MSN that no message has yet. So when we have N messages that need // MSNs, we decrement it by N, and then use it and the N-1 MSNs less // than it, but we do not use the value we decremented it to. // // In the code below, we initialize `lowest' with the value of // `highest_unused_msn_for_upgrade' after it is decremented, so we // need to be sure to increment it once before we enqueue our first // message. MSN highest_msn; highest_msn.msn = 0; // Deserialize de-compressed buffers. for (int i = 0; i < node->n_children; ++i) { NONLEAF_CHILDINFO bnc = BNC(node, i); MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb); if (highest_msn.msn == 0) { highest_msn.msn = highest_msn_in_this_buffer.msn; } } // Assign the highest msn from our upgrade message buffers node->max_msn_applied_to_node_on_disk = highest_msn; // Since we assigned MSNs to this node's messages, we need to dirty it. node->dirty = 1; // Must compute the checksum now (rather than at the end, while we // still have the pointer to the buffer). if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) { uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size-4); if (expected_xsum != actual_xsum) { fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n", __FUNCTION__, __LINE__, expected_xsum, actual_xsum); fprintf(stderr, "Checksum failure while reading node in file %s.\n", toku_cachefile_fname_in_env(bfe->ft->cf)); fflush(stderr); return toku_db_badformat(); } } return 0; } // This function takes a deserialized version 13 or 14 buffer and // constructs the associated leaf ftnode object. static int deserialize_and_upgrade_leaf_node(FTNODE node, struct rbuf *rb, ftnode_fetch_extra *bfe, STAT64INFO info) { int r = 0; int version = node->layout_version_read_from_disk; // This is a leaf node, so the offsets in the buffer will be // different from the internal node offsets above. uint64_t nkeys = rbuf_ulonglong(rb); // 10. nkeys uint64_t ndata = rbuf_ulonglong(rb); // 11. ndata uint64_t dsize = rbuf_ulonglong(rb); // 12. dsize invariant(nkeys == ndata); if (info) { // info is non-null if we're trying to upgrade old subtree // estimates to stat64info info->numrows += nkeys; info->numbytes += dsize; } // This is the optimized for upgrade field. if (version == FT_LAYOUT_VERSION_14) { (void) rbuf_int(rb); // 13. optimized } // npartitions - This is really the number of leaf entries in // our single basement node. There should only be 1 (ONE) // partition, so there shouldn't be any pivot key stored. This // means the loop will not iterate. We could remove the loop and // assert that the value is indeed 1. int npartitions = rbuf_int(rb); // 14. npartitions assert(npartitions == 1); // Set number of children to 1, since we will only have one // basement node. node->n_children = 1; XMALLOC_N(node->n_children, node->bp); node->pivotkeys.create_empty(); // Create one basement node to contain all the leaf entries by // setting up the single partition and updating the bfe. update_bfe_using_ftnode(node, bfe); ftnode_fetch_extra temp_bfe; temp_bfe.create_for_full_read(bfe->ft); setup_partitions_using_bfe(node, &temp_bfe, true); // 11. Deserialize the partition maps, though they are not used in the // newer versions of ftnodes. for (int i = 0; i < node->n_children; ++i) { // The following fields were previously used by the `sub_block_map' // They include: // - 4 byte index (void) rbuf_int(rb); // - 4 byte offset (void) rbuf_int(rb); // - 4 byte size (void) rbuf_int(rb); } // Copy all of the leaf entries into the single basement node. // The number of leaf entries in buffer. int n_in_buf = rbuf_int(rb); // 15. # of leaves BLB_SEQINSERT(node,0) = 0; BASEMENTNODE bn = BLB(node, 0); // Read the leaf entries from the buffer, advancing the buffer // as we go. bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM); if (version <= FT_LAYOUT_VERSION_13) { // Create our mempool. // Loop through for (int i = 0; i < n_in_buf; ++i) { LEAFENTRY_13 le = reinterpret_cast(&rb->buf[rb->ndone]); uint32_t disksize = leafentry_disksize_13(le); rb->ndone += disksize; // 16. leaf entry (13) invariant(rb->ndone<=rb->size); LEAFENTRY new_le; size_t new_le_size; void* key = NULL; uint32_t keylen = 0; r = toku_le_upgrade_13_14(le, &key, &keylen, &new_le_size, &new_le); assert_zero(r); // Copy the pointer value straight into the OMT LEAFENTRY new_le_in_bn = nullptr; void *maybe_free; bn->data_buffer.get_space_for_insert( i, key, keylen, new_le_size, &new_le_in_bn, &maybe_free ); if (maybe_free) { toku_free(maybe_free); } memcpy(new_le_in_bn, new_le, new_le_size); toku_free(new_le); } } else { uint32_t data_size = rb->size - rb->ndone; if (has_end_to_end_checksum) { data_size -= sizeof(uint32_t); } bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk); } // Whatever this is must be less than the MSNs of every message above // it, so it's ok to take it here. bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade; bn->stale_ancestor_messages_applied = false; node->max_msn_applied_to_node_on_disk = bn->max_msn_applied; // Checksum (end to end) is only on version 14 if (has_end_to_end_checksum) { uint32_t expected_xsum = rbuf_int(rb); // 17. checksum uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4); if (expected_xsum != actual_xsum) { fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n", __FUNCTION__, __LINE__, expected_xsum, actual_xsum); fprintf(stderr, "Checksum failure while reading node in file %s.\n", toku_cachefile_fname_in_env(bfe->ft->cf)); fflush(stderr); return toku_db_badformat(); } } // We should have read the whole block by this point. if (rb->ndone != rb->size) { // TODO: Error handling. return 1; } return r; } static int read_and_decompress_block_from_fd_into_rbuf( int fd, BLOCKNUM blocknum, DISKOFF offset, DISKOFF size, FT ft, struct rbuf *rb, /* out */ int *layout_version_p); // This function upgrades a version 14 or 13 ftnode to the current // version. NOTE: This code assumes the first field of the rbuf has // already been read from the buffer (namely the layout_version of the // ftnode.) static int deserialize_and_upgrade_ftnode(FTNODE node, FTNODE_DISK_DATA *ndd, BLOCKNUM blocknum, ftnode_fetch_extra *bfe, STAT64INFO info, int fd) { int r = 0; int version; // I. First we need to de-compress the entire node, only then can // we read the different sub-sections. // get the file offset and block size for the block DISKOFF offset, size; bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); struct rbuf rb; r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, bfe->ft, &rb, &version); if (r != 0) { const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); fprintf(stderr, "%s:%d:deserialize_and_upgrade_ftnode - " "file[%s], blocknum[%lld], " "read_and_decompress_block_from_fd_into_rbuf failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); goto exit; } // Re-read the magic field from the previous call, since we are // restarting with a fresh rbuf. { const void *magic; rbuf_literal_bytes(&rb, &magic, 8); // 1. magic } // II. Start reading ftnode fields out of the decompressed buffer. // Copy over old version info. node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version version = node->layout_version_read_from_disk; if (version > FT_LAYOUT_VERSION_14) { const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); fprintf(stderr, "%s:%d:deserialize_and_upgrade_ftnode - " "file[%s], blocknum[%lld], version[%d] > " "FT_LAYOUT_VERSION_14[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, version, FT_LAYOUT_VERSION_14); dump_bad_block(rb.buf, rb.size); goto exit; } assert(version <= FT_LAYOUT_VERSION_14); // Upgrade the current version number to the current version. node->layout_version = FT_LAYOUT_VERSION; node->layout_version_original = rbuf_int(&rb); // 3. original layout node->build_id = rbuf_int(&rb); // 4. build id // The remaining offsets into the rbuf do not map to the current // version, so we need to fill in the blanks and ignore older // fields. (void)rbuf_int(&rb); // 5. nodesize node->flags = rbuf_int(&rb); // 6. flags node->height = rbuf_int(&rb); // 7. height // If the version is less than 14, there are two extra ints here. // we would need to ignore them if they are there. // These are the 'fingerprints'. if (version == FT_LAYOUT_VERSION_13) { (void) rbuf_int(&rb); // 8. rand4 (void) rbuf_int(&rb); // 9. local } // The next offsets are dependent on whether this is a leaf node // or not. // III. Read in Leaf and Internal Node specific data. // Check height to determine whether this is a leaf node or not. if (node->height > 0) { r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info); } else { r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info); } XMALLOC_N(node->n_children, *ndd); // Initialize the partition locations to zero, because version 14 // and below have no notion of partitions on disk. for (int i=0; in_children; i++) { BP_START(*ndd,i) = 0; BP_SIZE (*ndd,i) = 0; } toku_free(rb.buf); exit: return r; } // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the // magic) into a FTNODE. static int deserialize_ftnode_from_rbuf(FTNODE *ftnode, FTNODE_DISK_DATA *ndd, BLOCKNUM blocknum, uint32_t fullhash, ftnode_fetch_extra *bfe, STAT64INFO info, struct rbuf *rb, int fd) { int r = 0; struct sub_block sb_node_info; tokutime_t t0, t1; tokutime_t decompress_time = 0; tokutime_t deserialize_time = 0; const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); t0 = toku_time_now(); FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum); // now start reading from rbuf // first thing we do is read the header information const void *magic; rbuf_literal_bytes(rb, &magic, 8); if (memcmp(magic, "tokuleaf", 8) != 0 && memcmp(magic, "tokunode", 8) != 0) { fprintf(stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], unrecognized magic number " "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, static_cast(magic)[0], static_cast(magic)[1], static_cast(magic)[2], static_cast(magic)[3], static_cast(magic)[4], static_cast(magic)[5], static_cast(magic)[6], static_cast(magic)[7]); dump_bad_block(rb->buf, rb->size); r = toku_db_badformat(); goto cleanup; } node->layout_version_read_from_disk = rbuf_int(rb); lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION); // Check if we are reading in an older node version. if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) { int version = node->layout_version_read_from_disk; // Perform the upgrade. r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd); if (r != 0) { fprintf(stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], deserialize_and_upgrade_ftnode " "failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block(rb->buf, rb->size); goto cleanup; } if (version <= FT_LAYOUT_VERSION_13) { // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13; } // If everything is ok, just re-assign the ftnode and retrn. *ftnode = node; r = 0; goto cleanup; } // Upgrade versions after 14 to current. This upgrade is trivial, it // removes the optimized for upgrade field, which has already been // removed in the deserialization code (see // deserialize_ftnode_info()). node->layout_version = FT_LAYOUT_VERSION; node->layout_version_original = rbuf_int(rb); node->build_id = rbuf_int(rb); node->n_children = rbuf_int(rb); XMALLOC_N(node->n_children, node->bp); XMALLOC_N(node->n_children, *ndd); // read the partition locations for (int i=0; in_children; i++) { BP_START(*ndd,i) = rbuf_int(rb); BP_SIZE (*ndd,i) = rbuf_int(rb); } // verify checksum of header stored uint32_t checksum; checksum = toku_x1764_memory(rb->buf, rb->ndone); uint32_t stored_checksum; stored_checksum = rbuf_int(rb); if (stored_checksum != checksum) { fprintf( stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, stored_checksum, checksum); dump_bad_block(rb->buf, rb->size); invariant(stored_checksum == checksum); } // now we read and decompress the pivot and child information sub_block_init(&sb_node_info); { tokutime_t sb_decompress_t0 = toku_time_now(); r = read_and_decompress_sub_block(rb, &sb_node_info); tokutime_t sb_decompress_t1 = toku_time_now(); decompress_time += sb_decompress_t1 - sb_decompress_t0; if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], read_and_decompress_sub_block failed " "with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block( static_cast(sb_node_info.uncompressed_ptr), sb_node_info.uncompressed_size); dump_bad_block(rb->buf, rb->size); goto cleanup; } } // at this point, sb->uncompressed_ptr stores the serialized node info r = deserialize_ftnode_info(&sb_node_info, node); if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], deserialize_ftnode_info failed with " "%d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block(rb->buf, rb->size); goto cleanup; } toku_free(sb_node_info.uncompressed_ptr); // now that the node info has been deserialized, we can proceed to // deserialize the individual sub blocks // setup the memory of the partitions // for partitions being decompressed, create either message buffer or // basement node // for partitions staying compressed, create sub_block setup_ftnode_partitions(node, bfe, true); // This loop is parallelizeable, since we don't have a dependency on the // work done so far. for (int i = 0; i < node->n_children; i++) { uint32_t curr_offset = BP_START(*ndd, i); uint32_t curr_size = BP_SIZE(*ndd, i); // the compressed, serialized partitions start at where rb is currently // pointing, which would be rb->buf + rb->ndone // we need to intialize curr_rbuf to point to this place struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0}; rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size); // // now we are at the point where we have: // - read the entire compressed node off of disk, // - decompressed the pivot and offset information, // - have arrived at the individual partitions. // // Based on the information in bfe, we want to decompress a subset of // of the compressed partitions (also possibly none or possibly all) // The partitions that we want to decompress and make available // to the node, we do, the rest we simply copy in compressed // form into the node, and set the state of the partition to // PT_COMPRESSED // struct sub_block curr_sb; sub_block_init(&curr_sb); // curr_rbuf is passed by value to decompress_and_deserialize_worker, // so there's no ugly race condition. // This would be more obvious if curr_rbuf were an array. // deserialize_ftnode_info figures out what the state // should be and sets up the memory so that we are ready to use it switch (BP_STATE(node, i)) { case PT_AVAIL: { // case where we read and decompress the partition tokutime_t partition_decompress_time; r = decompress_and_deserialize_worker( curr_rbuf, curr_sb, node, i, bfe->ft->cmp, &partition_decompress_time); decompress_time += partition_decompress_time; if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], childnum[%d], " "decompress_and_deserialize_worker failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, i, r); dump_bad_block(rb->buf, rb->size); goto cleanup; } break; } case PT_COMPRESSED: // case where we leave the partition in the compressed state r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i); if (r != 0) { fprintf( stderr, "%s:%d:deserialize_ftnode_from_rbuf - " "file[%s], blocknum[%lld], childnum[%d], " "check_and_copy_compressed_sub_block_worker failed with " "%d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, i, r); dump_bad_block(rb->buf, rb->size); goto cleanup; } break; case PT_INVALID: // this is really bad case PT_ON_DISK: // it's supposed to be in memory. abort(); } } *ftnode = node; r = 0; cleanup: if (r == 0) { t1 = toku_time_now(); deserialize_time = (t1 - t0) - decompress_time; bfe->deserialize_time += deserialize_time; bfe->decompress_time += decompress_time; toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); } if (r != 0) { // NOTE: Right now, callers higher in the stack will assert on // failure, so this is OK for production. However, if we // create tools that use this function to search for errors in // the FT, then we will leak memory. if (node) { toku_free(node); } } return r; } int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) { int r = 0; assert(BP_STATE(node,childnum) == PT_ON_DISK); assert(node->bp[childnum].ptr.tag == BCT_NULL); // // setup the partition // setup_available_ftnode_partition(node, childnum); BP_STATE(node,childnum) = PT_AVAIL; // // read off disk and make available in memory // // get the file offset and block size for the block DISKOFF node_offset, total_node_disk_size; bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size); uint32_t curr_offset = BP_START(ndd, childnum); uint32_t curr_size = BP_SIZE (ndd, childnum); struct rbuf rb; rbuf_init(&rb, nullptr, 0); uint32_t pad_at_beginning = (node_offset+curr_offset)%512; uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size); toku::scoped_malloc_aligned raw_block_buf(padded_size, 512); uint8_t *raw_block = reinterpret_cast(raw_block_buf.get()); rbuf_init(&rb, pad_at_beginning+raw_block, curr_size); tokutime_t t0 = toku_time_now(); // read the block assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT assert(0==(padded_size)%512); assert(0==(node_offset+curr_offset-pad_at_beginning)%512); ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning); assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted assert((DISKOFF)rlen <= padded_size); // we didn't read in too much. tokutime_t t1 = toku_time_now(); // read sub block struct sub_block curr_sb; sub_block_init(&curr_sb); r = read_compressed_sub_block(&rb, &curr_sb); if (r != 0) { return r; } invariant(curr_sb.compressed_ptr != NULL); // decompress toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size); curr_sb.uncompressed_ptr = uncompressed_buf.get(); toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size, (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size); // deserialize tokutime_t t2 = toku_time_now(); r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp); tokutime_t t3 = toku_time_now(); // capture stats tokutime_t io_time = t1 - t0; tokutime_t decompress_time = t2 - t1; tokutime_t deserialize_time = t3 - t2; bfe->deserialize_time += deserialize_time; bfe->decompress_time += decompress_time; toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); bfe->bytes_read = rlen; bfe->io_time = io_time; return r; } // Take a ftnode partition that is in the compressed state, and make it avail int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, ftnode_fetch_extra *bfe) { int r = 0; assert(BP_STATE(node, childnum) == PT_COMPRESSED); SUB_BLOCK curr_sb = BSB(node, childnum); toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size); assert(curr_sb->uncompressed_ptr == NULL); curr_sb->uncompressed_ptr = uncompressed_buf.get(); setup_available_ftnode_partition(node, childnum); BP_STATE(node,childnum) = PT_AVAIL; // decompress the sub_block tokutime_t t0 = toku_time_now(); toku_decompress((Bytef *)curr_sb->uncompressed_ptr, curr_sb->uncompressed_size, (Bytef *)curr_sb->compressed_ptr, curr_sb->compressed_size); tokutime_t t1 = toku_time_now(); r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp); if (r != 0) { const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); fprintf(stderr, "%s:%d:toku_deserialize_bp_from_compressed - " "file[%s], blocknum[%lld], " "deserialize_ftnode_partition failed with %d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)node->blocknum.b, r); dump_bad_block(static_cast(curr_sb->compressed_ptr), curr_sb->compressed_size); dump_bad_block(static_cast(curr_sb->uncompressed_ptr), curr_sb->uncompressed_size); } tokutime_t t2 = toku_time_now(); tokutime_t decompress_time = t1 - t0; tokutime_t deserialize_time = t2 - t1; bfe->deserialize_time += deserialize_time; bfe->decompress_time += decompress_time; toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); toku_free(curr_sb->compressed_ptr); toku_free(curr_sb); return r; } static int deserialize_ftnode_from_fd(int fd, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *ftnode, FTNODE_DISK_DATA *ndd, ftnode_fetch_extra *bfe, STAT64INFO info) { struct rbuf rb = RBUF_INITIALIZER; tokutime_t t0 = toku_time_now(); read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb); tokutime_t t1 = toku_time_now(); // Decompress and deserialize the ftnode. Time statistics // are taken inside this function. int r = deserialize_ftnode_from_rbuf( ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd); if (r != 0) { const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); fprintf( stderr, "%s:%d:deserialize_ftnode_from_fd - " "file[%s], blocknum[%lld], deserialize_ftnode_from_rbuf failed with " "%d\n", __FILE__, __LINE__, fname ? fname : "unknown", (longlong)blocknum.b, r); dump_bad_block(rb.buf, rb.size); } bfe->bytes_read = rb.size; bfe->io_time = t1 - t0; toku_free(rb.buf); return r; } // Effect: Read a node in. If possible, read just the header. // Perform version upgrade if necessary. int toku_deserialize_ftnode_from(int fd, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *ftnode, FTNODE_DISK_DATA *ndd, ftnode_fetch_extra *bfe) { int r = 0; struct rbuf rb = RBUF_INITIALIZER; // each function below takes the appropriate io/decompression/deserialize // statistics if (!bfe->read_all_partitions) { read_ftnode_header_from_fd_into_rbuf_if_small_enough( fd, blocknum, bfe->ft, &rb, bfe); r = deserialize_ftnode_header_from_rbuf_if_small_enough( ftnode, ndd, blocknum, fullhash, bfe, &rb, fd); } else { // force us to do it the old way r = -1; } if (r != 0) { // Something went wrong, go back to doing it the old way. r = deserialize_ftnode_from_fd( fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr); } toku_free(rb.buf); return r; } void toku_verify_or_set_counts(FTNODE UU(node)) { } int toku_db_badformat(void) { return DB_BADFORMAT; } static size_t serialize_rollback_log_size(ROLLBACK_LOG_NODE log) { size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id +16 //TXNID_PAIR +8 //sequence +8 //blocknum +8 //previous (blocknum) +8 //resident_bytecount +8 //memarena size +log->rollentry_resident_bytecount; return size; } static void serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) { struct wbuf wb; wbuf_init(&wb, buf, calculated_size); { //Serialize rollback log to local wbuf wbuf_nocrc_literal_bytes(&wb, "tokuroll", 8); lazy_assert(log->layout_version == FT_LAYOUT_VERSION); wbuf_nocrc_int(&wb, log->layout_version); wbuf_nocrc_int(&wb, log->layout_version_original); wbuf_nocrc_uint(&wb, BUILD_ID); wbuf_nocrc_TXNID_PAIR(&wb, log->txnid); wbuf_nocrc_ulonglong(&wb, log->sequence); wbuf_nocrc_BLOCKNUM(&wb, log->blocknum); wbuf_nocrc_BLOCKNUM(&wb, log->previous); wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount); //Write down memarena size needed to restore wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use()); { //Store rollback logs struct roll_entry *item; size_t done_before = wb.ndone; for (item = log->newest_logentry; item; item = item->prev) { toku_logger_rollback_wbuf_nocrc_write(&wb, item); } lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone); } } lazy_assert(wb.ndone == wb.size); lazy_assert(calculated_size==wb.ndone); } static void serialize_uncompressed_block_to_memory(char * uncompressed_buf, int n_sub_blocks, struct sub_block sub_block[/*n_sub_blocks*/], enum toku_compression_method method, /*out*/ size_t *n_bytes_to_write, /*out*/ char **bytes_to_write) // Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work) { // allocate space for the compressed uncompressed_buf size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method); size_t sub_block_header_len = sub_block_header_size(n_sub_blocks); size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf); // copy the header memcpy(compressed_buf, uncompressed_buf, node_header_overhead); if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n", uncompressed_buf[node_header_overhead], uncompressed_buf[node_header_overhead+1], uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]); // compress all of the sub blocks char *uncompressed_ptr = uncompressed_buf + node_header_overhead; char *compressed_ptr = compressed_buf + header_len; compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method); //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len); // serialize the sub block header uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead); *ptr++ = toku_htod32(n_sub_blocks); for (int i=0; ilen = calculated_size; serialized->n_sub_blocks = 0; // choose sub block parameters int sub_block_size = 0; size_t data_size = calculated_size - node_header_overhead; choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks); lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks); lazy_assert(sub_block_size > 0); // set the initial sub block size for all of the sub blocks for (int i = 0; i < serialized->n_sub_blocks; i++) sub_block_init(&serialized->sub_block[i]); set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block); // allocate space for the serialized node XMALLOC_N(calculated_size, serialized->data); // serialize the node into buf serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block); serialized->blocknum = log->blocknum; } int toku_serialize_rollback_log_to(int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized, FT ft, bool for_checkpoint) { size_t n_to_write; char *compressed_buf; struct serialized_rollback_log_node serialized_local; if (is_serialized) { invariant_null(log); } else { invariant_null(serialized_log); serialized_log = &serialized_local; toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log); } BLOCKNUM blocknum = serialized_log->blocknum; invariant(blocknum.b >= 0); // Compress and malloc buffer to write serialize_uncompressed_block_to_memory(serialized_log->data, serialized_log->n_sub_blocks, serialized_log->sub_block, ft->h->compression_method, &n_to_write, &compressed_buf); // Dirties the ft DISKOFF offset; ft->blocktable.realloc_on_disk( blocknum, n_to_write, &offset, ft, fd, for_checkpoint); toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset); toku_free(compressed_buf); if (!is_serialized) { toku_static_serialized_rollback_log_destroy(&serialized_local); log->dirty = 0; // See #1957. Must set the node to be clean after // serializing it so that it doesn't get written again // on the next checkpoint or eviction. } return 0; } static int deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) { ROLLBACK_LOG_NODE MALLOC(result); int r; if (result==NULL) { r=get_error_errno(); if (0) { died0: toku_free(result); } return r; } const void *magic; rbuf_literal_bytes(rb, &magic, 8); lazy_assert(!memcmp(magic, "tokuroll", 8)); result->layout_version = rbuf_int(rb); lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) || (result->layout_version == FT_LAYOUT_VERSION)); result->layout_version_original = rbuf_int(rb); result->layout_version_read_from_disk = result->layout_version; result->build_id = rbuf_int(rb); result->dirty = false; //TODO: Maybe add descriptor (or just descriptor version) here eventually? //TODO: This is hard.. everything is shared in a single dictionary. rbuf_TXNID_PAIR(rb, &result->txnid); result->sequence = rbuf_ulonglong(rb); result->blocknum = rbuf_blocknum(rb); if (result->blocknum.b != blocknum.b) { r = toku_db_badformat(); goto died0; } result->previous = rbuf_blocknum(rb); result->rollentry_resident_bytecount = rbuf_ulonglong(rb); size_t arena_initial_size = rbuf_ulonglong(rb); result->rollentry_arena.create(arena_initial_size); if (0) { died1: result->rollentry_arena.destroy(); goto died0; } //Load rollback entries lazy_assert(rb->size > 4); //Start with empty list result->oldest_logentry = result->newest_logentry = NULL; while (rb->ndone < rb->size) { struct roll_entry *item; uint32_t rollback_fsize = rbuf_int(rb); //Already read 4. Rest is 4 smaller const void *item_vec; rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4); unsigned char* item_buf = (unsigned char*)item_vec; r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena); if (r!=0) { r = toku_db_badformat(); goto died1; } //Add to head of list if (result->oldest_logentry) { result->oldest_logentry->prev = item; result->oldest_logentry = item; item->prev = NULL; } else { result->oldest_logentry = result->newest_logentry = item; item->prev = NULL; } } toku_free(rb->buf); rb->buf = NULL; *log_p = result; return 0; } static int deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log, struct rbuf *rb) { int r = 0; ROLLBACK_LOG_NODE rollback_log_node = NULL; invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION); r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb); if (r==0) { *log = rollback_log_node; } return r; } int decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { int r = 0; // get the number of compressed sub blocks int n_sub_blocks; n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead])); // verify the number of sub blocks invariant(0 <= n_sub_blocks); invariant(n_sub_blocks <= max_sub_blocks); { // verify the header checksum uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks); invariant(header_length <= raw_block_size); uint32_t xsum = toku_x1764_memory(raw_block, header_length); uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length)); if (xsum != stored_xsum) { r = TOKUDB_BAD_CHECKSUM; } } // deserialize the sub block header struct sub_block sub_block[n_sub_blocks]; uint32_t *sub_block_header = (uint32_t *) &raw_block[node_header_overhead+4]; for (int i = 0; i < n_sub_blocks; i++) { sub_block_init(&sub_block[i]); sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]); sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]); sub_block[i].xsum = toku_dtoh32(sub_block_header[2]); sub_block_header += 3; } // This predicate needs to be here and instead of where it is set // for the compiler. if (r == TOKUDB_BAD_CHECKSUM) { goto exit; } // verify sub block sizes for (int i = 0; i < n_sub_blocks; i++) { uint32_t compressed_size = sub_block[i].compressed_size; if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); goto exit; } uint32_t uncompressed_size = sub_block[i].uncompressed_size; if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size); if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); goto exit; } } // sum up the uncompressed size of the sub blocks size_t uncompressed_size; uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block); // allocate the uncompressed buffer size_t size; size = node_header_overhead + uncompressed_size; unsigned char *buf; XMALLOC_N(size, buf); rbuf_init(rb, buf, size); // copy the uncompressed node header to the uncompressed buffer memcpy(rb->buf, raw_block, node_header_overhead); // point at the start of the compressed data (past the node header, the sub block header, and the header checksum) unsigned char *compressed_data; compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t); // point at the start of the uncompressed data unsigned char *uncompressed_data; uncompressed_data = rb->buf + node_header_overhead; // decompress all the compressed sub blocks into the uncompressed buffer r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool); if (r != 0) { fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %zu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size); dump_bad_block(raw_block, raw_block_size); goto exit; } rb->ndone=0; exit: return r; } static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { // This function exists solely to accommodate future changes in compression. int r = 0; if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) || (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION) { r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum); } else { abort(); } return r; } static int read_and_decompress_block_from_fd_into_rbuf( int fd, BLOCKNUM blocknum, DISKOFF offset, DISKOFF size, FT ft, struct rbuf *rb, /* out */ int *layout_version_p) { int r = 0; if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); DISKOFF size_aligned = roundup_to_multiple(512, size); uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block); { // read the (partially compressed) block ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); lazy_assert((DISKOFF)rlen >= size); lazy_assert((DISKOFF)rlen <= size_aligned); } // get the layout_version int layout_version; { uint8_t *magic = raw_block + uncompressed_magic_offset; if (memcmp(magic, "tokuleaf", 8)!=0 && memcmp(magic, "tokunode", 8)!=0 && memcmp(magic, "tokuroll", 8)!=0) { r = toku_db_badformat(); goto cleanup; } uint8_t *version = raw_block + uncompressed_version_offset; layout_version = toku_dtoh32(*(uint32_t*)version); if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) { r = toku_db_badformat(); goto cleanup; } } r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum); if (r != 0) { // We either failed the checksome, or there is a bad format in // the buffer. if (r == TOKUDB_BAD_CHECKSUM) { fprintf(stderr, "Checksum failure while reading raw block in file %s.\n", toku_cachefile_fname_in_env(ft->cf)); abort(); } else { r = toku_db_badformat(); goto cleanup; } } *layout_version_p = layout_version; cleanup: if (r!=0) { if (rb->buf) toku_free(rb->buf); rb->buf = NULL; } if (raw_block) { toku_free(raw_block); } return r; } // Read rollback log node from file into struct. // Perform version upgrade if necessary. int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) { int layout_version = 0; int r; struct rbuf rb; rbuf_init(&rb, nullptr, 0); // get the file offset and block size for the block DISKOFF offset, size; ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); // if the size is 0, then the blocknum is unused if (size == 0) { // blocknum is unused, just create an empty one and get out ROLLBACK_LOG_NODE XMALLOC(log); rollback_empty_log_init(log); log->blocknum.b = blocknum.b; r = 0; *logp = log; goto cleanup; } r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version); if (r!=0) goto cleanup; { uint8_t *magic = rb.buf + uncompressed_magic_offset; if (memcmp(magic, "tokuroll", 8)!=0) { r = toku_db_badformat(); goto cleanup; } } r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb); cleanup: if (rb.buf) { toku_free(rb.buf); } return r; } int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft) { int r = 0; // 15 was the last version with subtree estimates invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15); FTNODE unused_node = NULL; FTNODE_DISK_DATA unused_ndd = NULL; ftnode_fetch_extra bfe; bfe.create_for_min_read(ft); r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd, &bfe, &ft->h->on_disk_stats); ft->in_memory_stats = ft->h->on_disk_stats; if (unused_node) { toku_ftnode_free(&unused_node); } if (unused_ndd) { toku_free(unused_ndd); } return r; } int toku_upgrade_msn_from_root_to_header(int fd, FT ft) { int r; // 21 was the first version with max_msn_in_ft in the header invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20); FTNODE node; FTNODE_DISK_DATA ndd; ftnode_fetch_extra bfe; bfe.create_for_min_read(ft); r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr); if (r != 0) { goto exit; } ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk; toku_ftnode_free(&node); toku_free(ndd); exit: return r; } #undef UPGRADE_STATUS_VALUE