diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-06 23:12:45 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-06 23:12:45 +0100 |
commit | 55e453260ca60fa2d439e9f9af3de9f0cd4b861a (patch) | |
tree | 63ece80e124e2aaa761e27186acd1692bfac9b32 | |
parent | 326341bc00df7e74268eb827d42b3fa2424fbaae (diff) | |
download | rabbitmq-server-55e453260ca60fa2d439e9f9af3de9f0cd4b861a.tar.gz |
big refactoring: extract msg_store from disk_queue
The msg_store knows nothing about queues, or message structure.
-rw-r--r-- | src/rabbit_disk_queue.erl | 1344 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 1128 |
2 files changed, 1315 insertions, 1157 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 538b08d8..ad5d8fb1 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -54,23 +54,13 @@ -include("rabbit.hrl"). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(MAX_READ_FILE_HANDLES, 256). +-define(FILE_SIZE_LIMIT, (256*1024*1024)). + -define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(FILE_EXTENSION_DETS, ".dets"). -define(BATCH_SIZE, 10000). --define(CACHE_MAX_SIZE, 10485760). --define(MAX_READ_FILE_HANDLES, 256). --define(FILE_SIZE_LIMIT, (256*1024*1024)). -define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat"). --define(BINARY_MODE, [raw, binary]). --define(READ_MODE, [read, read_ahead]). --define(WRITE_MODE, [write, delayed_write]). - -define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}). -define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, @@ -86,175 +76,15 @@ -define(SERVER, ?MODULE). -record(dqstate, - {msg_location_dets, %% where are messages? - msg_location_ets, %% as above, but for ets version - operation_mode, %% ram_disk | disk_only - file_summary, %% what's in the files? + {operation_mode, %% ram_disk | disk_only + store, %% message store sequences, %% next read and write for each q - current_file_num, %% current file name as number - current_file_name, %% current file name - current_file_handle, %% current file handle - current_offset, %% current offset within current file - current_dirty, %% has the current file been written to - %% since the last fsync? - file_size_limit, %% how big can our files get? - read_file_handle_cache, %% file handle cache for reading on_sync_txns, %% list of commiters to run on sync (reversed) commit_timer_ref, %% TRef for our interval timer - last_sync_offset, %% current_offset at the last time we sync'd - message_cache, %% ets message cache memory_report_timer_ref, %% TRef for the memory report timer - wordsize, %% bytes in a word on this platform - mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode - ets_bytes_per_record %% bytes per record in msg_location_ets + mnesia_bytes_per_record %% bytes per record in mnesia in ram_disk mode }). --record(msg_location, - {msg_id, ref_count, file, offset, total_size, is_persistent}). - --record(file_summary, - {file, valid_total_size, contiguous_top, left, right}). - -%% The components: -%% -%% MsgLocation: this is a (d)ets table which contains: -%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent} -%% FileSummary: this is an ets table which contains: -%% {File, ValidTotalSize, ContiguousTop, Left, Right} -%% Sequences: this is an ets table which contains: -%% {Q, ReadSeqId, WriteSeqId} -%% rabbit_disk_queue: this is an mnesia table which contains: -%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, -%% is_delivered = IsDelivered, -%% msg_id = MsgId -%% } -%% - -%% The basic idea is that messages are appended to the current file up -%% until that file becomes too big (> file_size_limit). At that point, -%% the file is closed and a new file is created on the _right_ of the -%% old file which is used for new messages. Files are named -%% numerically ascending, thus the file with the lowest name is the -%% eldest file. -%% -%% We need to keep track of which messages are in which files (this is -%% the MsgLocation table); how much useful data is in each file and -%% which files are on the left and right of each other. This is the -%% purpose of the FileSummary table. -%% -%% As messages are removed from files, holes appear in these -%% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. -%% -%% On publish, we write the message to disk, record the changes to -%% FileSummary and MsgLocation, and, should this be either a plain -%% publish, or followed by a tx_commit, we record the message in the -%% mnesia table. Sequences exists to enforce ordering of messages as -%% they are published within a queue. -%% -%% On delivery, we read the next message to be read from disk -%% (according to the ReadSeqId for the given queue) and record in the -%% mnesia table that the message has been delivered. -%% -%% On ack we remove the relevant entry from MsgLocation, update -%% FileSummary and delete from the mnesia table. -%% -%% In order to avoid extra mnesia searching, we return the SeqId -%% during delivery which must be returned in ack - it is not possible -%% to ack from MsgId alone. - -%% As messages are ack'd, holes develop in the files. When we discover -%% that either a file is now empty or that it can be combined with the -%% useful data in either its left or right file, we compact the two -%% files together. This keeps disk utilisation high and aids -%% performance. -%% -%% Given the compaction between two files, the left file is considered -%% the ultimate destination for the good data in the right file. If -%% necessary, the good data in the left file which is fragmented -%% throughout the file is written out to a temporary file, then read -%% back in to form a contiguous chunk of good data at the start of the -%% left file. Thus the left file is garbage collected and -%% compacted. Then the good data from the right file is copied onto -%% the end of the left file. MsgLocation and FileSummary tables are -%% updated. -%% -%% On startup, we scan the files we discover, dealing with the -%% possibilites of a crash have occured during a compaction (this -%% consists of tidyup - the compaction is deliberately designed such -%% that data is duplicated on disk rather than risking it being lost), -%% and rebuild the dets and ets tables (MsgLocation, FileSummary, -%% Sequences) from what we find. We ensure that the messages we have -%% discovered on disk match exactly with the messages recorded in the -%% mnesia table. - -%% MsgLocation is deliberately a dets table, and the mnesia table is -%% set to be a disk_only_table in order to ensure that we are not RAM -%% constrained. However, for performance reasons, it is possible to -%% call to_ram_disk_mode/0 which will alter the mnesia table to -%% disc_copies and convert MsgLocation to an ets table. This results -%% in a massive performance improvement, at the expense of greater RAM -%% usage. The idea is that when memory gets tight, we switch to -%% disk_only mode but otherwise try to run in ram_disk mode. - -%% So, with this design, messages move to the left. Eventually, they -%% should end up in a contiguous block on the left and are then never -%% rewritten. But this isn't quite the case. If in a file there is one -%% message that is being ignored, for some reason, and messages in the -%% file to the right and in the current block are being read all the -%% time then it will repeatedly be the case that the good data from -%% both files can be combined and will be written out to a new -%% file. Whenever this happens, our shunned message will be rewritten. -%% -%% So, provided that we combine messages in the right order, -%% (i.e. left file, bottom to top, right file, bottom to top), -%% eventually our shunned message will end up at the bottom of the -%% left file. The compaction/combining algorithm is smart enough to -%% read in good data from the left file that is scattered throughout -%% (i.e. C and D in the below diagram), then truncate the file to just -%% above B (i.e. truncate to the limit of the good contiguous region -%% at the start of the file), then write C and D on top and then write -%% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). -%% -%% +-------+ +-------+ +-------+ -%% | X | | G | | G | -%% +-------+ +-------+ +-------+ -%% | D | | X | | F | -%% +-------+ +-------+ +-------+ -%% | X | | X | | E | -%% +-------+ +-------+ +-------+ -%% | C | | F | ===> | D | -%% +-------+ +-------+ +-------+ -%% | X | | X | | C | -%% +-------+ +-------+ +-------+ -%% | B | | X | | B | -%% +-------+ +-------+ +-------+ -%% | A | | E | | A | -%% +-------+ +-------+ +-------+ -%% left right left -%% -%% From this reasoning, we do have a bound on the number of times the -%% message is rewritten. From when it is inserted, there can be no -%% files inserted between it and the head of the queue, and the worst -%% case is that everytime it is rewritten, it moves one position lower -%% in the file (for it to stay at the same position requires that -%% there are no holes beneath it, which means truncate would be used -%% and so it would not be rewritten at all). Thus this seems to -%% suggest the limit is the number of messages ahead of it in the -%% queue, though it's likely that that's pessimistic, given the -%% requirements for compaction/combination of files. -%% -%% The other property is that we have is the bound on the lowest -%% utilisation, which should be 50% - worst case is that all files are -%% fractionally over half full and can't be combined (equivalent is -%% alternating full files and files with only one tiny message in -%% them). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -414,69 +244,35 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ok = detect_shutdown_state_and_adjust_delivered_flags(), - file:delete(msg_location_dets_file()), - - {ok, MsgLocationDets} = - dets:open_file(?MSG_LOC_NAME, - [{file, msg_location_dets_file()}, - {min_no_slots, 1024*1024}, - %% man says this should be <= 32M. But it works... - {max_no_slots, 30*1024*1024}, - {type, set}, - {keypos, 2} - ]), - - %% it would be better to have this as private, but dets:from_ets/2 - %% seems to blow up if it is set private - see bug21489 - MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]), - - InitName = "0" ++ ?FILE_EXTENSION, - HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit, - ?BINARY_MODE ++ [read]), + ok = add_index(), + Store = rabbit_msg_store:init(Mode, base_directory(), + FileSizeLimit, ReadFileHandlesLimit, + fun ref_count/1, EtsBPR), + Store1 = prune_mnesia(Store), + ok = del_index(), + + Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), + ok = extract_sequence_numbers(Sequences), + State = - #dqstate { msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, - operation_mode = Mode, - file_summary = ets:new( - ?FILE_SUMMARY_ETS_NAME, - [set, private, {keypos, 2}]), - sequences = ets:new(?SEQUENCE_ETS_NAME, - [set, private]), - current_file_num = 0, - current_file_name = InitName, - current_file_handle = undefined, - current_offset = 0, - current_dirty = false, - file_size_limit = FileSizeLimit, - read_file_handle_cache = HandleCache, + #dqstate { operation_mode = Mode, + store = Store1, + sequences = Sequences, on_sync_txns = [], commit_timer_ref = undefined, - last_sync_offset = 0, - message_cache = ets:new(?CACHE_ETS_NAME, - [set, private]), memory_report_timer_ref = undefined, - wordsize = erlang:system_info(wordsize), - mnesia_bytes_per_record = MnesiaBPR, - ets_bytes_per_record = EtsBPR + mnesia_bytes_per_record = MnesiaBPR }, - {ok, State1 = #dqstate { current_file_name = CurrentName, - current_offset = Offset } } = - load_from_disk(State), - %% read is only needed so that we can seek - {ok, FileHdl} = open_file(CurrentName, ?WRITE_MODE ++ [read]), - {ok, Offset} = file:position(FileHdl, Offset), - State2 = State1 #dqstate { current_file_handle = FileHdl }, - {ok, start_memory_timer(State2), hibernate, + {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({fetch, Q}, _From, State) -> - {ok, Result, State1} = + {Result, State1} = internal_fetch_body(Q, record_delivery, pop_queue, State), reply(Result, State1); handle_call({phantom_fetch, Q}, _From, State) -> - {ok, Result, State1} = - internal_fetch_attributes(Q, record_delivery, pop_queue, State), - reply(Result, State1); + Result = internal_fetch_attributes(Q, record_delivery, pop_queue, State), + reply(Result, State); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> State1 = internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State), @@ -485,7 +281,7 @@ handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); handle_call(filesync, _From, State) -> - reply(ok, sync_current_file_handle(State)); + reply(ok, sync(State)); handle_call({delete_queue, Q}, From, State) -> gen_server2:reply(From, ok), {ok, State1} = internal_delete_queue(Q, State), @@ -515,8 +311,8 @@ handle_call(to_ram_disk_mode, _From, State) -> handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), reply(ok, State1); -handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) -> - reply(ets:info(Cache), State). +handle_call(cache_info, _From, State = #dqstate { store = Store }) -> + reply(rabbit_msg_store:cache_info(Store), State). handle_cast({publish, Q, Message, IsDelivered}, State) -> {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State), @@ -542,25 +338,19 @@ handle_cast({set_mode, Mode}, State) -> liberated -> fun to_ram_disk_mode/1 end)(State)); handle_cast({prefetch, Q, From}, State) -> - {ok, Result, State1} = + {Result, State1} = internal_fetch_body(Q, record_delivery, peek_queue, State), - Cont = rabbit_misc:with_exit_handler( - fun () -> false end, - fun () -> - ok = rabbit_queue_prefetcher:publish(From, Result), - true - end), - State3 = - case Cont of - true -> - case internal_fetch_attributes( - Q, ignore_delivery, pop_queue, State1) of - {ok, empty, State2} -> State2; - {ok, _, State2} -> State2 - end; - false -> State1 - end, - noreply(State3). + case rabbit_misc:with_exit_handler( + fun () -> false end, + fun () -> + ok = rabbit_queue_prefetcher:publish(From, Result), + true + end) of + true -> + internal_fetch_attributes(Q, ignore_delivery, pop_queue, State1); + false -> ok + end, + noreply(State1). handle_info(report_memory, State) -> %% call noreply1/2, not noreply/1/2, as we don't want to restart the @@ -571,7 +361,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State) -> %% must have commit_timer set, so timeout was 0, and we're not hibernating - noreply(sync_current_file_handle(State)). + noreply(sync(State)). handle_pre_hibernate(State) -> %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer @@ -585,33 +375,11 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { sequences = undefined }) -> State; -shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, - file_summary = FileSummary, - sequences = Sequences, - current_file_handle = FileHdl, - read_file_handle_cache = HC - }) -> +shutdown(State = #dqstate { sequences = Sequences, store = Store }) -> State1 = stop_commit_timer(stop_memory_timer(State)), - case FileHdl of - undefined -> ok; - _ -> sync_current_file_handle(State1), - file:close(FileHdl) - end, - HC1 = rabbit_file_handle_cache:close_all(HC), - dets:close(MsgLocationDets), - file:delete(msg_location_dets_file()), - ets:delete(MsgLocationEts), - ets:delete(FileSummary), + Store1 = rabbit_msg_store:cleanup(Store), ets:delete(Sequences), - State1 #dqstate { msg_location_dets = undefined, - msg_location_ets = undefined, - file_summary = undefined, - sequences = undefined, - current_file_handle = undefined, - current_dirty = false, - read_file_handle_cache = HC1 - }. + State1 #dqstate { sequences = undefined, store = Store1 }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -643,168 +411,59 @@ report_memory(Hibernating, State) -> rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes), Hibernating). -memory_use(#dqstate { operation_mode = ram_disk, - file_summary = FileSummary, - sequences = Sequences, - msg_location_ets = MsgLocationEts, - message_cache = Cache, - wordsize = WordSize - }) -> - WordSize * (mnesia:table_info(rabbit_disk_queue, memory) + - lists:sum([ets:info(Table, memory) - || Table <- [MsgLocationEts, FileSummary, Cache, - Sequences]])); +memory_use(#dqstate { operation_mode = ram_disk, + store = Store, + sequences = Sequences }) -> + WordSize = erlang:system_info(wordsize), + rabbit_msg_store:memory(Store) + + WordSize * ets:info(Sequences, memory) + + WordSize * mnesia:table_info(rabbit_disk_queue, memory); memory_use(#dqstate { operation_mode = disk_only, - file_summary = FileSummary, + store = Store, sequences = Sequences, - msg_location_dets = MsgLocationDets, - message_cache = Cache, - wordsize = WordSize, - mnesia_bytes_per_record = MnesiaBytesPerRecord, - ets_bytes_per_record = EtsBytesPerRecord }) -> - (WordSize * (lists:sum([ets:info(Table, memory) - || Table <- [FileSummary, Cache, Sequences]]))) + - rabbit_misc:ceil( - mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord) + + mnesia_bytes_per_record = MnesiaBytesPerRecord }) -> + WordSize = erlang:system_info(wordsize), + rabbit_msg_store:memory(Store) + + WordSize * ets:info(Sequences, memory) + rabbit_misc:ceil( - dets:info(MsgLocationDets, size) * EtsBytesPerRecord). + mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord). to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> State; -to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, - wordsize = WordSize }) -> +to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, + store = Store }) -> rabbit_log:info("Converting disk queue to disk only mode~n", []), - MnesiaMemBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory), - EtsMemBytes = WordSize * ets:info(MsgLocationEts, memory), - MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]), - EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]), + MnesiaBPR = erlang:system_info(wordsize) * + mnesia:table_info(rabbit_disk_queue, memory) / + lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]), + EtsBPR = rabbit_msg_store:ets_bpr(Store), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), - MnesiaBPR = MnesiaMemBytes / MnesiaSize, - EtsBPR = EtsMemBytes / EtsSize, + Store1 = rabbit_msg_store:to_disk_only_mode(Store), Path = form_filename(?DISK_ONLY_MODE_FILE), case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of ok -> ok; {error, Reason} -> throw({error, {cannot_create_disk_only_mode_file, Path, Reason}}) end, - ok = dets:from_ets(MsgLocationDets, MsgLocationEts), - true = ets:delete_all_objects(MsgLocationEts), garbage_collect(), State #dqstate { operation_mode = disk_only, - mnesia_bytes_per_record = MnesiaBPR, - ets_bytes_per_record = EtsBPR }. + store = Store1, + mnesia_bytes_per_record = MnesiaBPR }. to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) -> State; -to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> +to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, + store = Store }) -> rabbit_log:info("Converting disk queue to ram disk mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), + Store1 = rabbit_msg_store:to_ram_disk_mode(Store), ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)), - true = ets:from_dets(MsgLocationEts, MsgLocationDets), - ok = dets:delete_all_objects(MsgLocationDets), garbage_collect(), State #dqstate { operation_mode = ram_disk, - mnesia_bytes_per_record = undefined, - ets_bytes_per_record = undefined }. - -%%---------------------------------------------------------------------------- -%% message cache helper functions -%%---------------------------------------------------------------------------- - -%% The purpose of the cache is not especially performance, though it -%% can help there too. The main purpose is to ensure that individual -%% messages that are sent to multiple queues, and then to disk, are -%% read back as the same binary object rather than multiples of -%% identical binary objects. This prevents memory explosion. -%% -%% We limit the cache in size. If we didn't, then we could have two -%% queues coming off the same exchange, receiving the same millions of -%% messages, then one queue gets drained, which would pull the entire -%% queue into the cache, which would potentially explode memory. - -remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) -> - true = ets:delete(Cache, MsgId), - ok. - -fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> - case ets:lookup(Cache, MsgId) of - [] -> - not_found; - [{MsgId, Message, _RefCount}] -> - NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), - {Message, NewRefCount} - end. - -decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> - true = try case ets:update_counter(Cache, MsgId, {3, -1}) of - N when N =< 0 -> true = ets:delete(Cache, MsgId); - _N -> true - end - catch error:badarg -> - %% MsgId is not in there because although it's been - %% delivered, it's never actually been read (think: - %% persistent message in mixed queue) - true - end, - ok. - -insert_into_cache(Message = #basic_message { guid = MsgId }, - #dqstate { message_cache = Cache }) -> - case cache_is_full(Cache) of - true -> ok; - false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), - ok - end. - -cache_is_full(Cache) -> - ets:info(Cache, memory) > ?CACHE_MAX_SIZE. - -%%---------------------------------------------------------------------------- -%% dets/ets agnosticism -%%---------------------------------------------------------------------------- - -dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Key) -> - dets:lookup(MsgLocationDets, Key); -dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Key) -> - ets:lookup(MsgLocationEts, Key). - -dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Key) -> - ok = dets:delete(MsgLocationDets, Key); -dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Key) -> - true = ets:delete(MsgLocationEts, Key), - ok. - -dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - ok = dets:insert(MsgLocationDets, Obj); -dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - true = ets:insert(MsgLocationEts, Obj), - ok. - -dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - true = dets:insert_new(MsgLocationDets, Obj); -dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - true = ets:insert_new(MsgLocationEts, Obj). - -dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - dets:match_object(MsgLocationDets, Obj); -dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - ets:match_object(MsgLocationEts, Obj). + store = Store1, + mnesia_bytes_per_record = undefined }. %%---------------------------------------------------------------------------- %% general helper functions @@ -840,50 +499,10 @@ form_filename(Name) -> base_directory() -> filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). -msg_location_dets_file() -> - form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). - -open_file(File, Mode) -> file:open(form_filename(File), ?BINARY_MODE ++ Mode). - -with_read_handle_at(File, Offset, Fun, State = - #dqstate { read_file_handle_cache = HC, - current_file_name = CurName, - current_dirty = IsDirty, - last_sync_offset = SyncOffset - }) -> - State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> - sync_current_file_handle(State); - true -> State - end, - FilePath = form_filename(File), - {Result, HC1} = - rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), - {Result, State1 #dqstate { read_file_handle_cache = HC1 }}. - -sync_current_file_handle(State = #dqstate { current_dirty = false, - on_sync_txns = [] }) -> - State; -sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, - current_dirty = IsDirty, - current_offset = CurOffset, - on_sync_txns = Txns, - last_sync_offset = SyncOffset - }) -> - SyncOffset1 = case IsDirty of - true -> ok = file:sync(CurHdl), - CurOffset; - false -> SyncOffset - end, - State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)), - State1 #dqstate { current_dirty = false, on_sync_txns = [], - last_sync_offset = SyncOffset1 }. - sequence_lookup(Sequences, Q) -> case ets:lookup(Sequences, Q) of - [] -> - {0, 0}; - [{Q, ReadSeqId, WriteSeqId}] -> - {ReadSeqId, WriteSeqId} + [] -> {0, 0}; + [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId} end. start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> @@ -896,91 +515,54 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #dqstate { commit_timer_ref = undefined }. -msg_to_bin(Msg = #basic_message { content = Content }) -> - ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), - term_to_binary(Msg #basic_message { content = ClearedContent }). - -bin_to_msg(MsgBin) -> - binary_to_term(MsgBin). +sync(State = #dqstate { store = Store, on_sync_txns = Txns }) -> + State1 = State #dqstate { store = rabbit_msg_store:sync(Store) }, + case Txns of + [] -> State1; + _ -> lists:foldl(fun internal_do_tx_commit/2, + State1 #dqstate { on_sync_txns = [] }, + lists:reverse(Txns)) + end. %%---------------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------------- -internal_fetch_body(Q, MarkDelivered, Advance, State) -> - case queue_head(Q, MarkDelivered, Advance, State) of - E = {ok, empty, _} -> E; - {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} -> - {Message, State2} = read_stored_message(StoreEntry, State1), - {ok, {Message, IsDelivered, AckTag, Remaining}, State2} +internal_fetch_body(Q, MarkDelivered, Advance, + State = #dqstate { store = Store }) -> + case next(Q, MarkDelivered, Advance, State) of + empty -> {empty, State}; + {MsgId, IsDelivered, AckTag, Remaining} -> + {Message, Store1} = rabbit_msg_store:read(MsgId, Store), + State1 = State #dqstate { store = Store1 }, + {{Message, IsDelivered, AckTag, Remaining}, State1} end. -internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> - case queue_head(Q, MarkDelivered, Advance, State) of - E = {ok, empty, _} -> E; - {ok, AckTag, IsDelivered, - #msg_location { msg_id = MsgId, is_persistent = IsPersistent }, - Remaining, State1} -> - {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} +internal_fetch_attributes(Q, MarkDelivered, Advance, + State = #dqstate { store = Store }) -> + case next(Q, MarkDelivered, Advance, State) of + empty -> empty; + {MsgId, IsDelivered, AckTag, Remaining} -> + IsPersistent = rabbit_msg_store:is_persistent(MsgId, Store), + {MsgId, IsPersistent, IsDelivered, AckTag, Remaining} end. -queue_head(Q, MarkDelivered, Advance, - State = #dqstate { sequences = Sequences }) -> +next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of - {SeqId, SeqId} -> {ok, empty, State}; + {SeqId, SeqId} -> empty; {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> Remaining = WriteSeqId - ReadSeqId - 1, - {AckTag, IsDelivered, StoreEntry} = - update_message_attributes(Q, ReadSeqId, MarkDelivered, State), + {MsgId, IsDelivered} = + update_message_attributes(Q, ReadSeqId, MarkDelivered), ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId), - {ok, AckTag, IsDelivered, StoreEntry, Remaining, State} + AckTag = {MsgId, ReadSeqId}, + {MsgId, IsDelivered, AckTag, Remaining} end. -maybe_advance(peek_queue, _, _, _, _) -> - ok; -maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> - true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), - ok. - -read_stored_message(#msg_location { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize }, State) -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - {{ok, {MsgId, MsgBody, _IsPersistent, _BodySize}}, State1} = - with_read_handle_at( - File, Offset, - fun(Hdl) -> - Res = case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _, _, _}} = Obj -> Obj; - {ok, Rest} -> - throw({error, - {misread, [{old_state, State}, - {file, File}, - {offset, Offset}, - {read, Rest}]}}) - end, - {Offset + TotalSize, Res} - end, State), - Message = #basic_message {} = bin_to_msg(MsgBody), - ok = if RefCount > 1 -> - insert_into_cache(Message, State1); - true -> ok - %% it's not in the cache and we only have - %% 1 queue with the message. So don't - %% bother putting it in the cache. - end, - {Message, State1}; - {Message, _RefCount} -> - {Message, State} - end. - -update_message_attributes(Q, SeqId, MarkDelivered, State) -> +update_message_attributes(Q, SeqId, MarkDelivered) -> [Obj = #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), - [StoreEntry = #msg_location { msg_id = MsgId }] = - dets_ets_lookup(State, MsgId), ok = case {IsDelivered, MarkDelivered} of {true, _} -> ok; {false, ignore_delivery} -> ok; @@ -988,130 +570,62 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) -> mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, - {{MsgId, SeqId}, IsDelivered, StoreEntry}. + {MsgId, IsDelivered}. + +maybe_advance(peek_queue, _, _, _, _) -> + ok; +maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> + true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + ok. internal_foldl(Q, Fun, Init, State) -> - State1 = #dqstate { sequences = Sequences } = - sync_current_file_handle(State), + State1 = #dqstate { sequences = Sequences } = sync(State), {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; -internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> - {AckTag, IsDelivered, StoreEntry} = - update_message_attributes(Q, ReadSeqId, ignore_delivery, State), - {Message, State1} = read_stored_message(StoreEntry, State), - Acc1 = Fun(Message, AckTag, IsDelivered, Acc), - internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1). +internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store }, + Acc, ReadSeqId) -> + [#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), + {Message, Store1} = rabbit_msg_store:read(MsgId, Store), + Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc), + internal_foldl(Q, WriteSeqId, Fun, State #dqstate { store = Store1 }, + Acc1, ReadSeqId + 1). internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). %% Q is only needed if MnesiaDelete /= false -remove_messages(Q, MsgSeqIds, MnesiaDelete, State) -> - Files = - lists:foldl( - fun ({MsgId, SeqId}, Files1) -> - Files2 = remove_message(MsgId, Files1, State), - ok = case MnesiaDelete of - true -> mnesia:dirty_delete(rabbit_disk_queue, - {Q, SeqId}); - _ -> ok - end, - Files2 - end, sets:new(), MsgSeqIds), - State1 = compact(Files, State), - {ok, State1}. - -remove_message(MsgId, Files, - State = #dqstate { file_summary = FileSummary, - current_file_name = CurName - }) -> - [StoreEntry = - #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize }] = - dets_ets_lookup(State, MsgId), - case RefCount of - 1 -> - ok = dets_ets_delete(State, MsgId), - ok = remove_cache_entry(MsgId, State), - [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop }] = - ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - if CurName =:= File -> Files; - true -> sets:add_element(File, Files) - end; - _ when 1 < RefCount -> - ok = decrement_cache(MsgId, State), - ok = dets_ets_insert(State, StoreEntry #msg_location { - ref_count = RefCount - 1 }), - Files - end. +remove_messages(Q, MsgSeqIds, MnesiaDelete, + State = #dqstate { store = Store } ) -> + MsgIds = lists:foldl( + fun ({MsgId, SeqId}, MsgIdAcc) -> + ok = case MnesiaDelete of + true -> mnesia:dirty_delete(rabbit_disk_queue, + {Q, SeqId}); + _ -> ok + end, + [MsgId | MsgIdAcc] + end, [], MsgSeqIds), + Store1 = rabbit_msg_store:remove(MsgIds, Store), + {ok, State #dqstate { store = Store1}}. internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, - guid = MsgId }, - State = #dqstate { current_file_handle = CurHdl, - current_file_name = CurName, - current_offset = CurOffset, - file_summary = FileSummary - }) -> - case dets_ets_lookup(State, MsgId) of - [] -> - %% New message, lots to do - {ok, TotalSize} = rabbit_msg_file:append( - CurHdl, MsgId, msg_to_bin(Message), - IsPersistent), - true = dets_ets_insert_new( - State, #msg_location { - msg_id = MsgId, ref_count = 1, file = CurName, - offset = CurOffset, total_size = TotalSize, - is_persistent = IsPersistent }), - [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - right = undefined }] = - ets:lookup(FileSummary, CurName), - ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = if CurOffset =:= ContiguousTop -> - %% can't be any holes in this file - ValidTotalSize1; - true -> ContiguousTop - end, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - NextOffset = CurOffset + TotalSize, - maybe_roll_to_new_file( - NextOffset, State #dqstate {current_offset = NextOffset, - current_dirty = true}); - [StoreEntry = - #msg_location { msg_id = MsgId, ref_count = RefCount }] -> - %% We already know about it, just update counter - ok = dets_ets_insert(State, StoreEntry #msg_location { - ref_count = RefCount + 1 }), - {ok, State} - end. + guid = MsgId, + content = Content }, + State = #dqstate { store = Store }) -> + ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), + Message1 = Message #basic_message { content = ClearedContent }, + Store1 = rabbit_msg_store:write(MsgId, Message1, IsPersistent, Store), + {ok, State #dqstate { store = Store1 }}. internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { current_file_name = CurFile, - current_dirty = IsDirty, - on_sync_txns = Txns, - last_sync_offset = SyncOffset - }) -> - NeedsSync = IsDirty andalso - lists:any(fun ({MsgId, _IsDelivered}) -> - [#msg_location { msg_id = MsgId, file = File, - offset = Offset }] = - dets_ets_lookup(State, MsgId), - File =:= CurFile andalso Offset >= SyncOffset - end, PubMsgIds), + State = #dqstate { store = Store, on_sync_txns = Txns }) -> TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, - case NeedsSync of + case rabbit_msg_store:needs_sync( + [MsgId || {MsgId, _IsDelivered} <- PubMsgIds], Store) of true -> Txns1 = [TxnDetails | Txns], State #dqstate { on_sync_txns = Txns1 }; false -> internal_do_tx_commit(TxnDetails, State) @@ -1165,13 +679,14 @@ internal_tx_rollback(MsgIds, State) -> internal_requeue(_Q, [], State) -> {ok, State}; -internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> +internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store, + sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've %% already been delivered). We also know that the rows for these %% messages are still in rabbit_disk_queue (i.e. they've not been %% ack'd). - + %% %% Now, it would be nice if we could adjust the sequence ids in %% rabbit_disk_queue (mnesia) to create a contiguous block and %% then drop the ReadSeqId for the queue by the corresponding @@ -1180,13 +695,14 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% which are not being requeued. As such, moving things about in %% rabbit_disk_queue _under_ the current ReadSeqId would result in %% such sequence ids referring to the wrong messages. - + %% %% Therefore, the only solution is to take these messages, and to %% reenqueue them at the top of the queue. Usefully, this only %% affects the Sequences and rabbit_disk_queue structures - there %% is no need to physically move the messages about on disk, so - %% MsgLocation and FileSummary stay put (which makes further sense - %% as they have no concept of sequence id anyway). + %% the message store remains unaffected, except we need to tell it + %% about the ids of the requeued messages so it can remove them + %% from its message cache if necessary. {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), {WriteSeqId1, Q, MsgIds} = @@ -1197,8 +713,8 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> MsgSeqIds) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), - lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), - {ok, State}. + Store1 = rabbit_msg_store:release(MsgIds, Store), + {ok, State #dqstate { store = Store1 }}. requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = @@ -1212,7 +728,8 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> {WriteSeqId + 1, Q, [MsgId | Acc]}. %% move the next N messages from the front of the queue to the back. -internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> +internal_requeue_next_n(Q, N, State = #dqstate { store = Store, + sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; true -> @@ -1224,8 +741,8 @@ internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> end ), true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), - lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), - {ok, State} + Store1 = rabbit_msg_store:release(MsgIds, Store), + {ok, State #dqstate { store = Store1 }} end. requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> @@ -1257,7 +774,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end. internal_delete_queue(Q, State) -> - State1 = sync_current_file_handle(State), + State1 = sync(State), {ok, _Count, State2 = #dqstate { sequences = Sequences }} = internal_purge(Q, State1), %% remove everything undelivered true = ets:delete(Sequences, Q), @@ -1283,269 +800,6 @@ internal_delete_non_durable_queues( end, {ok, State}, Sequences). %%---------------------------------------------------------------------------- -%% garbage collection / compaction / aggregation -%%---------------------------------------------------------------------------- - -maybe_roll_to_new_file(Offset, - State = #dqstate { file_size_limit = FileSizeLimit, - current_file_name = CurName, - current_file_handle = CurHdl, - current_file_num = CurNum, - file_summary = FileSummary - } - ) when Offset >= FileSizeLimit -> - State1 = sync_current_file_handle(State), - ok = file:close(CurHdl), - NextNum = CurNum + 1, - NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, - {ok, NextHdl} = open_file(NextName, ?WRITE_MODE), - true = ets:update_element(FileSummary, CurName, - {#file_summary.right, NextName}), - true = ets:insert_new( - FileSummary, #file_summary { - file = NextName, valid_total_size = 0, contiguous_top = 0, - left = CurName, right = undefined }), - State2 = State1 #dqstate { current_file_name = NextName, - current_file_handle = NextHdl, - current_file_num = NextNum, - current_offset = 0, - last_sync_offset = 0 - }, - {ok, compact(sets:from_list([CurName]), State2)}; -maybe_roll_to_new_file(_, State) -> - {ok, State}. - -compact(FilesSet, State) -> - %% smallest number, hence eldest, hence left-most, first - Files = lists:sort(fun file_name_sort/2, sets:to_list(FilesSet)), - %% foldl reverses, so now youngest/right-most first - RemainingFiles = lists:foldl(fun (File, Acc) -> - delete_empty_files(File, Acc, State) - end, [], Files), - lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). - -%% At this stage, we simply know that the file has had msgs removed -%% from it. However, we don't know if we need to merge it left (which -%% is what we would prefer), or merge it right. If we merge left, then -%% this file is the source, and the left file is the destination. If -%% we merge right then this file is the destination and the right file -%% is the source. -combine_file(File, State = #dqstate { file_summary = FileSummary, - current_file_name = CurName - }) -> - %% the file we're looking at may no longer exist as it may have - %% been deleted within the current GC run - case ets:lookup(FileSummary, File) of - [] -> State; - [FSEntry = #file_summary { left = Left, right = Right }] -> - GoRight = - fun() -> - case Right of - undefined -> State; - _ when not (CurName == Right) -> - [FSRight] = ets:lookup(FileSummary, Right), - {_, State1} = adjust_meta_and_combine( - FSEntry, FSRight, State), - State1; - _ -> State - end - end, - case Left of - undefined -> - GoRight(); - _ -> [FSLeft] = ets:lookup(FileSummary, Left), - case adjust_meta_and_combine(FSLeft, FSEntry, State) of - {true, State1} -> State1; - {false, State} -> GoRight() - end - end - end. - -adjust_meta_and_combine( - LeftObj = #file_summary { - file = LeftFile, valid_total_size = LeftValidData, right = RightFile }, - RightObj = #file_summary { - file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight }, - State = #dqstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary }) -> - TotalValidData = LeftValidData + RightValidData, - if FileSizeLimit >= TotalValidData -> - State1 = combine_files(RightObj, LeftObj, State), - %% this could fail if RightRight is undefined - ets:update_element(FileSummary, RightRight, - {#file_summary.left, LeftFile}), - true = ets:insert(FileSummary, LeftObj #file_summary { - valid_total_size = TotalValidData, - contiguous_top = TotalValidData, - right = RightRight }), - true = ets:delete(FileSummary, RightFile), - {true, State1}; - true -> {false, State} - end. - -sort_msg_locations_by_offset(Dir, List) -> - Comp = case Dir of - asc -> fun erlang:'<'/2; - desc -> fun erlang:'>'/2 - end, - lists:sort(fun (#msg_location { offset = OffA }, - #msg_location { offset = OffB }) -> - Comp(OffA, OffB) - end, List). - -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit), - ok = file:truncate(Hdl), - {ok, FinalPos} = file:position(Hdl, FinalPos), - ok. - -truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file:position(FileHdl, Lowpoint), - ok = file:truncate(FileHdl), - ok = preallocate(FileHdl, Highpoint, Lowpoint). - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, - right = Source }, - State) -> - State1 = close_file(Source, close_file(Destination, State)), - {ok, SourceHdl} = open_file(Source, ?READ_MODE), - {ok, DestinationHdl} = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, - %% if DestinationValid =:= DestinationContiguousTop then we don't - %% need a tmp file - %% if they're not equal, then we need to write out everything past - %% the DestinationContiguousTop to a tmp file then truncate, - %% copy back in, and then copy over from Source - %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); - true -> - Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect - %% that the list should be naturally sorted - %% as we require, however, we need to - %% enforce it anyway - end, sort_msg_locations_by_offset( - asc, dets_ets_match_object( - State1, #msg_location { - file = Destination, _ = '_' }))), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State1), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocationDets has been updated to - %% reflect compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file:sync(DestinationHdl), - ok = file:close(TmpHdl), - ok = file:delete(form_filename(Tmp)) - end, - SourceWorkList = - sort_msg_locations_by_offset( - asc, dets_ets_match_object(State1, #msg_location { - file = Source, _ = '_' })), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State1), - %% tidy up - ok = file:close(SourceHdl), - ok = file:close(DestinationHdl), - ok = file:delete(form_filename(Source)), - State1. - -copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, State) -> - {FinalOffset, BlockStart1, BlockEnd1} = - lists:foldl( - fun (StoreEntry = #msg_location { offset = Offset, - total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert(State, StoreEntry #msg_location { - file = Destination, - offset = CurOffset }), - NextOffset = CurOffset + TotalSize, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the next - %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work for - %% the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file:position(SourceHdl, BlockStart), - {ok, BSize} = - file:copy(SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + TotalSize} - end - end, {InitOffset, undefined, undefined}, WorkList), - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file:position(SourceHdl, BlockStart1), - {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), - ok = file:sync(DestinationHdl), - ok. - -close_file(File, State = #dqstate { read_file_handle_cache = HC }) -> - HC1 = rabbit_file_handle_cache:close_file(form_filename(File), HC), - State #dqstate { read_file_handle_cache = HC1 }. - -delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> - [#file_summary { valid_total_size = ValidData, - left = Left, right = Right }] = - ets:lookup(FileSummary, File), - case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> - case {Left, Right} of - {undefined, _} when not is_atom(Right) -> - %% the eldest file is empty. - true = ets:update_element( - FileSummary, Right, - {#file_summary.left, undefined}); - {_, _} when not (is_atom(Right)) -> - true = ets:update_element(FileSummary, Right, - {#file_summary.left, Left}), - true = - ets:update_element(FileSummary, Left, - {#file_summary.right, Right}) - end, - true = ets:delete(FileSummary, File), - ok = file:delete(form_filename(File)), - Acc; - _ -> [File|Acc] - end. - -%%---------------------------------------------------------------------------- %% recovery %%---------------------------------------------------------------------------- @@ -1630,55 +884,35 @@ del_index() -> E1 -> E1 end. -load_from_disk(State) -> - %% sorted so that smallest number is first. which also means - %% eldest file (left-most) first - ok = add_index(), - {Files, TmpFiles} = get_disk_queue_files(), - ok = recover_crashed_compactions(Files, TmpFiles), - %% There should be no more tmp files now, so go ahead and load the - %% whole lot - Files1 = case Files of - [] -> [State #dqstate.current_file_name]; - _ -> Files - end, - State1 = load_messages(undefined, Files1, State), - %% Finally, check there is nothing in mnesia which we haven't - %% loaded - Key = mnesia:dirty_first(rabbit_disk_queue), - {ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0), - State2 = compact(AlteredFiles, State1), - ok = extract_sequence_numbers(State2 #dqstate.sequences), - ok = del_index(), - {ok, State2}. - -prune_mnesia_flush_batch(DeleteAcc) -> +prune_mnesia_flush_batch(DeleteAcc, RemoveAcc, Store) -> lists:foldl(fun (Key, ok) -> mnesia:dirty_delete(rabbit_disk_queue, Key) - end, ok, DeleteAcc). - -prune_mnesia(_State, '$end_of_table', Files, _DeleteAcc, 0) -> - {ok, Files}; -prune_mnesia(_State, '$end_of_table', Files, DeleteAcc, _Len) -> - ok = prune_mnesia_flush_batch(DeleteAcc), - {ok, Files}; -prune_mnesia(State, Key, Files, DeleteAcc, Len) -> + end, ok, DeleteAcc), + rabbit_msg_store:remove(RemoveAcc, Store). + +prune_mnesia(Store) -> + prune_mnesia(Store, mnesia:dirty_first(rabbit_disk_queue), [], [], 0). + +prune_mnesia(Store, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) -> + Store; +prune_mnesia(Store, '$end_of_table', DeleteAcc, RemoveAcc, _Len) -> + prune_mnesia_flush_batch(DeleteAcc, RemoveAcc, Store); +prune_mnesia(Store, Key, DeleteAcc, RemoveAcc, Len) -> [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = mnesia:dirty_read(rabbit_disk_queue, Key), - {DeleteAcc1, Files1, Len1} = - case dets_ets_lookup(State, MsgId) of - [] -> + {DeleteAcc1, RemoveAcc1, Len1} = + case rabbit_msg_store:is_persistent(MsgId, Store) of + not_found -> %% msg hasn't been found on disk, delete it - {[{Q, SeqId} | DeleteAcc], Files, Len + 1}; - [#msg_location { msg_id = MsgId, is_persistent = true }] -> + {[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1}; + true -> %% msg is persistent, keep it - {DeleteAcc, Files, Len}; - [#msg_location { msg_id = MsgId, is_persistent = false}] -> + {DeleteAcc, RemoveAcc, Len}; + false -> %% msg is not persistent, delete it - Files2 = remove_message(MsgId, Files, State), - {[{Q, SeqId} | DeleteAcc], Files2, Len + 1} + {[{Q, SeqId} | DeleteAcc], [MsgId | RemoveAcc], Len + 1} end, - {Key1, DeleteAcc2, Len2} = + {Store1, Key1, DeleteAcc2, RemoveAcc2, Len2} = if Len1 >= ?BATCH_SIZE -> %% We have no way of knowing how flushing the batch @@ -1686,14 +920,15 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) -> %% so have no choice but to start again. Although this %% will make recovery slower for large queues, we %% guarantee we can start up in constant memory - ok = prune_mnesia_flush_batch(DeleteAcc1), + Store2 = prune_mnesia_flush_batch(DeleteAcc1, RemoveAcc1, + Store), Key2 = mnesia:dirty_first(rabbit_disk_queue), - {Key2, [], 0}; + {Store2, Key2, [], [], 0}; true -> Key2 = mnesia:dirty_next(rabbit_disk_queue, Key), - {Key2, DeleteAcc1, Len1} + {Store, Key2, DeleteAcc1, RemoveAcc1, Len1} end, - prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2). + prune_mnesia(Store1, Key1, DeleteAcc2, RemoveAcc2, Len2). extract_sequence_numbers(Sequences) -> true = @@ -1712,7 +947,7 @@ extract_sequence_numbers(Sequences) -> case ets:lookup(Sequences, Q) of [] -> ets:insert_new(Sequences, {Q, SeqId, NextWrite}); - [Orig = {Q, Read, Write}] -> + [Orig = {_, Read, Write}] -> Repl = {Q, lists:min([Read, SeqId]), lists:max([Write, NextWrite])}, case Orig == Repl of @@ -1767,213 +1002,8 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> end, shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). -load_messages(Left, [], State) -> - Num = list_to_integer(filename:rootname(Left)), - Offset = - case dets_ets_match_object(State, #msg_location { - file = Left, _ = '_' }) of - [] -> 0; - L -> - [ #msg_location { file = Left, - offset = MaxOffset, - total_size = TotalSize} | _ ] = - sort_msg_locations_by_offset(desc, L), - MaxOffset + TotalSize - end, - State #dqstate { current_file_num = Num, current_file_name = Left, - current_offset = Offset }; -load_messages(Left, [File|Files], - State = #dqstate { file_summary = FileSummary }) -> - {ok, Messages} = scan_file_for_valid_messages(File), - {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, _ = '_' }, - msg_id)) of - 0 -> {VMAcc, VTSAcc}; - RefCount -> - true = dets_ets_insert_new( - State, #msg_location { - msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize, - is_persistent = IsPersistent }), - {[Obj | VMAcc], VTSAcc + TotalSize} - end - end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), - Right = case Files of - [] -> undefined; - [F|_] -> F - end, - true = ets:insert_new(FileSummary, #file_summary { - file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - left = Left, right = Right }), - load_messages(File, Files, State). - -recover_crashed_compactions(Files, TmpFiles) -> - lists:foreach(fun (TmpFile) -> - ok = recover_crashed_compactions1(Files, TmpFile) end, - TmpFiles), - ok. - -verify_messages_in_mnesia(MsgIds) -> - lists:foreach( - fun (MsgId) -> - true = 0 < length(mnesia:dirty_index_match_object( - rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, _ = '_' }, - msg_id)) - end, MsgIds). - -scan_file_for_valid_messages_msg_ids(File) -> - {ok, Messages} = scan_file_for_valid_messages(File), - {ok, Messages, - [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}. - -recover_crashed_compactions1(Files, TmpFile) -> - NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, - true = lists:member(NonTmpRelatedFile, Files), - {ok, UncorruptedMessagesTmp, MsgIdsTmp} = - scan_file_for_valid_messages_msg_ids(TmpFile), - %% all of these messages should appear in the mnesia table, - %% otherwise they wouldn't have been copied out - verify_messages_in_mnesia(MsgIdsTmp), - {ok, UncorruptedMessages, MsgIds} = - scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile), - %% 1) It's possible that everything in the tmp file is also in the - %% main file such that the main file is (prefix ++ - %% tmpfile). This means that compaction failed immediately - %% prior to the final step of deleting the tmp file. Plan: just - %% delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the - %% main file but with holes throughout (or just somthing like - %% main = (prefix ++ hole ++ tmpfile)). This means that - %% compaction wrote out the tmp file successfully and then - %% failed. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the - %% main file but such that the main file does not end with tmp - %% file (and there are valid messages in the suffix; main = - %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This - %% means that compaction failed as we were writing out the tmp - %% file. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which - %% are not in the main file. This means that writing out the - %% tmp file succeeded, but then we failed as we were copying - %% them back over to the main file, after truncating the main - %% file. As the main file has already been truncated, it should - %% consist only of valid messages. Plan: Truncate the main file - %% back to before any of the files in the tmp file and copy - %% them over again - TmpPath = form_filename(TmpFile), - case is_sublist(MsgIdsTmp, MsgIds) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpPath); - false -> - %% We're in case 4 above. We only care about the inital - %% msgs in main file that are not in the tmp file. If - %% there are no msgs in the tmp file then we would be in - %% the 'true' branch of this case, so we know the - %% lists:last call is safe. - EldestTmpMsgId = lists:last(MsgIdsTmp), - {MsgIds1, UncorruptedMessages1} - = case lists:splitwith( - fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of - {_MsgIds, []} -> %% no msgs from tmp in main - {MsgIds, UncorruptedMessages}; - {Dropped, [EldestTmpMsgId | Rest]} -> - %% Msgs in Dropped are in tmp, so forget them. - %% *cry*. Lists indexed from 1. - {Rest, lists:sublist(UncorruptedMessages, - 2 + length(Dropped), - length(Rest))} - end, - %% Check that everything in the main file prefix is a - %% valid message in mnesia - verify_messages_in_mnesia(MsgIds1), - %% The main file prefix should be contiguous - {Top, MsgIds1} = find_contiguous_block_prefix( - lists:reverse(UncorruptedMessages1)), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = is_disjoint(MsgIds1, MsgIdsTmp), - %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]), - %% Wipe out any rubbish at the end of the file. Remember - %% the head of the list will be the highest entry in the - %% file. - [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize, - %% Extend the main file as big as necessary in a single - %% move. If we run out of disk space, this truncate could - %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(TmpFile, ?READ_MODE), - {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), - ok = file:sync(MainHdl), - ok = file:close(MainHdl), - ok = file:close(TmpHdl), - ok = file:delete(TmpPath), - - {ok, _MainMessages, MsgIdsMain} = - scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile), - %% check that everything in MsgIds1 is in MsgIdsMain - true = is_sublist(MsgIds1, MsgIdsMain), - %% check that everything in MsgIdsTmp is in MsgIdsMain - true = is_sublist(MsgIdsTmp, MsgIdsMain) - end, - ok. - -is_sublist(SmallerList, BiggerList) -> - lists:all(fun (Item) -> lists:member(Item, BiggerList) end, SmallerList). - -is_disjoint(SmallerList, BiggerList) -> - lists:all(fun (Item) -> not lists:member(Item, BiggerList) end, SmallerList). - -%% Takes the list in *ascending* order (i.e. eldest message -%% first). This is the opposite of what scan_file_for_valid_messages -%% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix([]) -> {0, []}; -find_contiguous_block_prefix(List) -> - find_contiguous_block_prefix(List, 0, []). - -find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> - {ExpectedOffset, MsgIds}; -find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} - | Tail], ExpectedOffset, MsgIds) -> - ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> - {ExpectedOffset, MsgIds}. - -file_name_sort(A, B) -> - ANum = list_to_integer(filename:rootname(A)), - BNum = list_to_integer(filename:rootname(B)), - ANum < BNum. - -get_disk_queue_files() -> - DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()), - DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles), - DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()), - DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles), - {DQFilesSorted, DQTFilesSorted}. - -scan_file_for_valid_messages(File) -> - case open_file(File, ?READ_MODE) of - {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), - %% if something really bad's happened, the close could fail, - %% but ignore - file:close(Hdl), - Valid; - {error, enoent} -> {ok, []}; - {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}}) - end. +ref_count(MsgId) -> + length(mnesia:dirty_index_match_object( + rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, _ = '_' }, + msg_id)). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl new file mode 100644 index 00000000..e4ccc1df --- /dev/null +++ b/src/rabbit_msg_store.erl @@ -0,0 +1,1128 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store). + +-export([init/6, write/4, read/2, is_persistent/2, remove/2, release/2, + needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1, + ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]). + +%%---------------------------------------------------------------------------- + +-record(msstate, + {operation_mode, %% ram_disk | disk_only + dir, %% store directory + msg_location_dets, %% where are messages? + msg_location_ets, %% as above, but for ets version + file_summary, %% what's in the files? + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file + current_dirty, %% has the current file been written to + %% since the last fsync? + file_size_limit, %% how big can our files get? + read_file_handle_cache, %% file handle cache for reading + last_sync_offset, %% current_offset at the last time we sync'd + message_cache, %% ets message cache + ets_bytes_per_record %% bytes per record in msg_location_ets + }). + +-record(msg_location, + {msg_id, ref_count, file, offset, total_size, is_persistent}). + +-record(file_summary, + {file, valid_total_size, contiguous_top, left, right}). + +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). + +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(CACHE_MAX_SIZE, 10485760). + +-define(BINARY_MODE, [raw, binary]). +-define(READ_MODE, [read, read_ahead]). +-define(WRITE_MODE, [write, delayed_write]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(mode() :: 'ram_disk' | 'disk_only'). +-type(dets_table() :: any()). +-type(ets_table() :: any()). +-type(msg_id() :: any()). +-type(msg() :: any()). +-type(file_path() :: any()). +-type(io_device() :: any()). + +-type(msstate() :: #msstate { + operation_mode :: mode(), + dir :: file_path(), + msg_location_dets :: dets_table(), + msg_location_ets :: ets_table(), + file_summary :: ets_table(), + current_file_num :: non_neg_integer(), + current_file_name :: file_path(), + current_file_handle :: io_device(), + current_offset :: non_neg_integer(), + current_dirty :: boolean(), + file_size_limit :: non_neg_integer(), + read_file_handle_cache :: any(), + last_sync_offset :: non_neg_integer(), + message_cache :: ets_table(), + ets_bytes_per_record :: non_neg_integer() + }). + +-spec(init/6 :: ('ram_disk' | 'disk_only', file_path(), + non_neg_integer(), non_neg_integer(), + fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) -> + msstate()). +-spec(write/4 :: (msg_id(), msg(), boolean(), msstate()) -> msstate()). +-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). +-spec(is_persistent/2 :: (msg_id(), msstate()) -> boolean() | 'not_found'). +-spec(remove/2 :: ([msg_id()], msstate()) -> msstate()). +-spec(release/2 :: ([msg_id()], msstate()) -> msstate()). +-spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()). +-spec(sync/1 :: (msstate()) -> msstate()). +-spec(cleanup/1 :: (msstate()) -> msstate()). +-spec(cache_info/1 :: (msstate()) -> [{atom(), term()}]). +-spec(memory/1 :: (msstate()) -> non_neg_integer()). +-spec(ets_bpr/1 :: (msstate()) -> non_neg_integer()). +-spec(to_disk_only_mode/1 :: (msstate()) -> msstate()). +-spec(to_ram_disk_mode/1 :: (msstate()) -> msstate()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% The components: +%% +%% MsgLocation: this is a (d)ets table which contains: +%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent} +%% FileSummary: this is an ets table which contains: +%% {File, ValidTotalSize, ContiguousTop, Left, Right} +%% +%% The basic idea is that messages are appended to the current file up +%% until that file becomes too big (> file_size_limit). At that point, +%% the file is closed and a new file is created on the _right_ of the +%% old file which is used for new messages. Files are named +%% numerically ascending, thus the file with the lowest name is the +%% eldest file. +%% +%% We need to keep track of which messages are in which files (this is +%% the MsgLocation table); how much useful data is in each file and +%% which files are on the left and right of each other. This is the +%% purpose of the FileSummary table. +%% +%% As messages are removed from files, holes appear in these +%% files. The field ValidTotalSize contains the total amount of useful +%% data left in the file, whilst ContiguousTop contains the amount of +%% valid data right at the start of each file. These are needed for +%% garbage collection. +%% +%% When we discover that either a file is now empty or that it can be +%% combined with the useful data in either its left or right file, we +%% compact the two files together. This keeps disk utilisation high +%% and aids performance. +%% +%% Given the compaction between two files, the left file is considered +%% the ultimate destination for the good data in the right file. If +%% necessary, the good data in the left file which is fragmented +%% throughout the file is written out to a temporary file, then read +%% back in to form a contiguous chunk of good data at the start of the +%% left file. Thus the left file is garbage collected and +%% compacted. Then the good data from the right file is copied onto +%% the end of the left file. MsgLocation and FileSummary tables are +%% updated. +%% +%% On startup, we scan the files we discover, dealing with the +%% possibilites of a crash have occured during a compaction (this +%% consists of tidyup - the compaction is deliberately designed such +%% that data is duplicated on disk rather than risking it being lost), +%% and rebuild the dets and ets tables (MsgLocation, FileSummary). +%% +%% MsgLocation is deliberately a dets table in order to ensure that we +%% are not RAM constrained. However, for performance reasons, it is +%% possible to call to_ram_disk_mode/0 which will convert MsgLocation +%% to an ets table. This results in a massive performance improvement, +%% at the expense of greater RAM usage. The idea is that when memory +%% gets tight, we switch to disk_only mode but otherwise try to run in +%% ram_disk mode. +%% +%% So, with this design, messages move to the left. Eventually, they +%% should end up in a contiguous block on the left and are then never +%% rewritten. But this isn't quite the case. If in a file there is one +%% message that is being ignored, for some reason, and messages in the +%% file to the right and in the current block are being read all the +%% time then it will repeatedly be the case that the good data from +%% both files can be combined and will be written out to a new +%% file. Whenever this happens, our shunned message will be rewritten. +%% +%% So, provided that we combine messages in the right order, +%% (i.e. left file, bottom to top, right file, bottom to top), +%% eventually our shunned message will end up at the bottom of the +%% left file. The compaction/combining algorithm is smart enough to +%% read in good data from the left file that is scattered throughout +%% (i.e. C and D in the below diagram), then truncate the file to just +%% above B (i.e. truncate to the limit of the good contiguous region +%% at the start of the file), then write C and D on top and then write +%% E, F and G from the right file on top. Thus contiguous blocks of +%% good data at the bottom of files are not rewritten (yes, this is +%% the data the size of which is tracked by the ContiguousTop +%% variable. Judicious use of a mirror is required). +%% +%% +-------+ +-------+ +-------+ +%% | X | | G | | G | +%% +-------+ +-------+ +-------+ +%% | D | | X | | F | +%% +-------+ +-------+ +-------+ +%% | X | | X | | E | +%% +-------+ +-------+ +-------+ +%% | C | | F | ===> | D | +%% +-------+ +-------+ +-------+ +%% | X | | X | | C | +%% +-------+ +-------+ +-------+ +%% | B | | X | | B | +%% +-------+ +-------+ +-------+ +%% | A | | E | | A | +%% +-------+ +-------+ +-------+ +%% left right left +%% +%% From this reasoning, we do have a bound on the number of times the +%% message is rewritten. From when it is inserted, there can be no +%% files inserted between it and the head of the queue, and the worst +%% case is that everytime it is rewritten, it moves one position lower +%% in the file (for it to stay at the same position requires that +%% there are no holes beneath it, which means truncate would be used +%% and so it would not be rewritten at all). Thus this seems to +%% suggest the limit is the number of messages ahead of it in the +%% queue, though it's likely that that's pessimistic, given the +%% requirements for compaction/combination of files. +%% +%% The other property is that we have is the bound on the lowest +%% utilisation, which should be 50% - worst case is that all files are +%% fractionally over half full and can't be combined (equivalent is +%% alternating full files and files with only one tiny message in +%% them). +%% +%% Messages are reference-counted. When a message with the same id is +%% written several times we only store it once, and only remove it +%% from the store when it has been removed the same number of times. +%% +%% The reference counts do not persist. Therefore the initialisation +%% function must be provided with a function that determines the +%% initial reference count of any (recovered) message. +%% +%% Read messages with a reference count greater than one are entered +%% into a message cache. The purpose of the cache is not especially +%% performance, though it can help there too, but prevention of memory +%% explosion. It ensures that as messages with a high reference count +%% are read from several processes they are read back as the same +%% binary object rather than multiples of identical binary +%% objects. + +%%---------------------------------------------------------------------------- +%% public API +%%---------------------------------------------------------------------------- + +init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, + EtsBytesPerRecord) -> + + file:delete(msg_location_dets_file(Dir)), + + {ok, MsgLocationDets} = + dets:open_file(?MSG_LOC_NAME, + [{file, msg_location_dets_file(Dir)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 30*1024*1024}, + {type, set}, + {keypos, 2} + ]), + + %% it would be better to have this as private, but dets:from_ets/2 + %% seems to blow up if it is set private - see bug21489 + MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]), + + InitName = "0" ++ ?FILE_EXTENSION, + HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit, + ?BINARY_MODE ++ [read]), + State = + #msstate { operation_mode = Mode, + dir = Dir, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + file_summary = ets:new( + ?FILE_SUMMARY_ETS_NAME, + [set, private, {keypos, 2}]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + current_dirty = false, + file_size_limit = FileSizeLimit, + read_file_handle_cache = HandleCache, + last_sync_offset = 0, + message_cache = ets:new(?CACHE_ETS_NAME, + [set, private]), + ets_bytes_per_record = EtsBytesPerRecord + }, + + Files = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), + TmpFiles = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), + ok = recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles), + %% There should be no more tmp files now, so go ahead and load the + %% whole lot + State1 = #msstate { current_file_name = CurrentName, + current_offset = Offset } = + load_messages(RefCountFun, Files, State), + + %% read is only needed so that we can seek + {ok, FileHdl} = open_file(Dir, CurrentName, ?WRITE_MODE ++ [read]), + {ok, Offset} = file:position(FileHdl, Offset), + + State1 #msstate { current_file_handle = FileHdl }. + +write(MsgId, Msg, IsPersistent, + State = #msstate { current_file_handle = CurHdl, + current_file_name = CurName, + current_offset = CurOffset, + file_summary = FileSummary }) -> + case dets_ets_lookup(State, MsgId) of + [] -> + %% New message, lots to do + {ok, TotalSize} = rabbit_msg_file:append( + CurHdl, MsgId, term_to_binary(Msg), + IsPersistent), + true = dets_ets_insert_new( + State, #msg_location { + msg_id = MsgId, ref_count = 1, file = CurName, + offset = CurOffset, total_size = TotalSize, + is_persistent = IsPersistent }), + [FSEntry = #file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + right = undefined }] = + ets:lookup(FileSummary, CurName), + ValidTotalSize1 = ValidTotalSize + TotalSize, + ContiguousTop1 = if CurOffset =:= ContiguousTop -> + %% can't be any holes in this file + ValidTotalSize1; + true -> ContiguousTop + end, + true = ets:insert(FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + NextOffset = CurOffset + TotalSize, + maybe_roll_to_new_file( + NextOffset, State #msstate {current_offset = NextOffset, + current_dirty = true}); + [StoreEntry = + #msg_location { msg_id = MsgId, ref_count = RefCount }] -> + %% We already know about it, just update counter + ok = dets_ets_insert(State, StoreEntry #msg_location { + ref_count = RefCount + 1 }), + State + end. + +read(MsgId, State) -> + Objs = dets_ets_lookup(State, MsgId), + case Objs of + [] -> + not_found; + [#msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize }] -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + {{ok, {MsgId, MsgBody, _IsPersistent, _BodySize}}, State1} = + with_read_handle_at( + File, Offset, + fun(Hdl) -> + Res = case rabbit_msg_file:read( + Hdl, TotalSize) of + {ok, {MsgId, _, _, _}} = Obj -> Obj; + {ok, Rest} -> + throw({error, + {misread, + [{old_state, State}, + {file, File}, + {offset, Offset}, + {read, Rest}]}}) + end, + {Offset + TotalSize, Res} + end, State), + Message = binary_to_term(MsgBody), + ok = if RefCount > 1 -> + insert_into_cache(MsgId, Message, State1); + true -> ok + %% it's not in the cache and we + %% only have one reference to the + %% message. So don't bother + %% putting it in the cache. + end, + {Message, State1}; + {Message, _RefCount} -> + {Message, State} + end + end. + +is_persistent(MsgId, State) -> + Objs = dets_ets_lookup(State, MsgId), + case Objs of + [] -> + not_found; + [#msg_location { msg_id = MsgId, is_persistent = IsPersistent }] -> + IsPersistent + end. + +remove(MsgIds, State = #msstate { current_file_name = CurName }) -> + compact(sets:to_list( + lists:foldl( + fun (MsgId, Files1) -> + case remove_message(MsgId, State) of + {compact, File} -> + if CurName =:= File -> Files1; + true -> sets:add_element(File, Files1) + end; + no_compact -> Files1 + end + end, sets:new(), MsgIds)), + State). + +release(MsgIds, State) -> + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), + State. + +needs_sync(_MsgIds, #msstate { current_dirty = false }) -> + false; +needs_sync(MsgIds, State = #msstate { current_file_name = CurFile, + last_sync_offset = SyncOffset }) -> + lists:any(fun (MsgId) -> + [#msg_location { msg_id = MsgId, file = File, + offset = Offset }] = + dets_ets_lookup(State, MsgId), + File =:= CurFile andalso Offset >= SyncOffset + end, MsgIds). + +sync(State = #msstate { current_dirty = false }) -> + State; +sync(State = #msstate { current_file_handle = CurHdl, + current_offset = CurOffset }) -> + ok = file:sync(CurHdl), + State #msstate { current_dirty = false, last_sync_offset = CurOffset }. + +cleanup(State = #msstate { dir = Dir, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + file_summary = FileSummary, + current_file_handle = FileHdl, + read_file_handle_cache = HC }) -> + State1 = case FileHdl of + undefined -> State; + _ -> State2 = sync(State), + file:close(FileHdl), + State2 + end, + HC1 = rabbit_file_handle_cache:close_all(HC), + dets:close(MsgLocationDets), + file:delete(msg_location_dets_file(Dir)), + ets:delete(MsgLocationEts), + ets:delete(FileSummary), + State1 #msstate { msg_location_dets = undefined, + msg_location_ets = undefined, + file_summary = undefined, + current_file_handle = undefined, + current_dirty = false, + read_file_handle_cache = HC1 + }. + +cache_info(#msstate { message_cache = Cache }) -> + ets:info(Cache). + +memory(#msstate { operation_mode = ram_disk, + file_summary = FileSummary, + msg_location_ets = MsgLocationEts, + message_cache = Cache }) -> + erlang:system_info(wordsize) * + lists:sum([ets:info(Table, memory) || + Table <- [FileSummary, MsgLocationEts, Cache]]); +memory(#msstate { operation_mode = disk_only, + file_summary = FileSummary, + msg_location_dets = MsgLocationDets, + message_cache = Cache, + ets_bytes_per_record = EtsBytesPerRecord }) -> + erlang:system_info(wordsize) * + lists:sum([ets:info(Table, memory) || + Table <- [FileSummary, Cache]]) + + rabbit_misc:ceil(dets:info(MsgLocationDets, size) * EtsBytesPerRecord). + +ets_bpr(#msstate { operation_mode = disk_only, + ets_bytes_per_record = EtsBytesPerRecord }) -> + EtsBytesPerRecord; +ets_bpr(#msstate { operation_mode = ram_disk, + msg_location_ets = MsgLocationEts }) -> + erlang:system_info(wordsize) * ets:info(MsgLocationEts, memory) / + lists:max([1, ets:info(MsgLocationEts, size)]). + +to_disk_only_mode(State = #msstate { operation_mode = disk_only }) -> + State; +to_disk_only_mode(State = #msstate { operation_mode = ram_disk, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + ok = dets:from_ets(MsgLocationDets, MsgLocationEts), + true = ets:delete_all_objects(MsgLocationEts), + State #msstate { operation_mode = disk_only, + ets_bytes_per_record = ets_bpr(State) }. + +to_ram_disk_mode(State = #msstate { operation_mode = ram_disk }) -> + State; +to_ram_disk_mode(State = #msstate { operation_mode = disk_only, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + true = ets:from_dets(MsgLocationEts, MsgLocationDets), + ok = dets:delete_all_objects(MsgLocationDets), + State #msstate { operation_mode = ram_disk, + ets_bytes_per_record = undefined }. + +%%---------------------------------------------------------------------------- +%% general helper functions +%%---------------------------------------------------------------------------- + +form_filename(Dir, Name) -> + filename:join(Dir, Name). + +msg_location_dets_file(Dir) -> + form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). + +open_file(Dir, File, Mode) -> + file:open(form_filename(Dir, File), ?BINARY_MODE ++ Mode). + +sort_file_names(Files) -> + lists:sort(fun (A, B) -> + ANum = list_to_integer(filename:rootname(A)), + BNum = list_to_integer(filename:rootname(B)), + ANum < BNum + end, Files). + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, FinalPos), + ok. + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file:position(FileHdl, Lowpoint), + ok = file:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +with_read_handle_at(File, Offset, Fun, + State = #msstate { dir = Dir, + read_file_handle_cache = HC, + current_file_name = CurName, + current_dirty = IsDirty, + last_sync_offset = SyncOffset }) -> + State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> + sync(State); + true -> State + end, + FilePath = form_filename(Dir, File), + {Result, HC1} = + rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), + {Result, State1 #msstate { read_file_handle_cache = HC1 }}. + +remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> + [StoreEntry = + #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize }] = + dets_ets_lookup(State, MsgId), + case RefCount of + 1 -> + ok = dets_ets_delete(State, MsgId), + ok = remove_cache_entry(MsgId, State), + [FSEntry = #file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop }] = + ets:lookup(FileSummary, File), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:insert(FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + {compact, File}; + _ when 1 < RefCount -> + ok = decrement_cache(MsgId, State), + ok = dets_ets_insert(State, StoreEntry #msg_location { + ref_count = RefCount - 1 }), + no_compact + end. + +%%---------------------------------------------------------------------------- +%% message cache helper functions +%%---------------------------------------------------------------------------- + +remove_cache_entry(MsgId, #msstate { message_cache = Cache }) -> + true = ets:delete(Cache, MsgId), + ok. + +fetch_and_increment_cache(MsgId, #msstate { message_cache = Cache }) -> + case ets:lookup(Cache, MsgId) of + [] -> + not_found; + [{MsgId, Message, _RefCount}] -> + NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), + {Message, NewRefCount} + end. + +decrement_cache(MsgId, #msstate { message_cache = Cache }) -> + true = try case ets:update_counter(Cache, MsgId, {3, -1}) of + N when N =< 0 -> true = ets:delete(Cache, MsgId); + _N -> true + end + catch error:badarg -> + %% MsgId is not in there because although it's been + %% delivered, it's never actually been read (think: + %% persistent message in mixed queue) + true + end, + ok. + +insert_into_cache(MsgId, Message, #msstate { message_cache = Cache }) -> + case cache_is_full(Cache) of + true -> ok; + false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), + ok + end. + +cache_is_full(Cache) -> + ets:info(Cache, memory) > ?CACHE_MAX_SIZE. + +%%---------------------------------------------------------------------------- +%% dets/ets agnosticism +%%---------------------------------------------------------------------------- + +dets_ets_lookup(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> + dets:lookup(MsgLocationDets, Key); +dets_ets_lookup(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> + ets:lookup(MsgLocationEts, Key). + +dets_ets_delete(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> + ok = dets:delete(MsgLocationDets, Key); +dets_ets_delete(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> + true = ets:delete(MsgLocationEts, Key), + ok. + +dets_ets_insert(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + ok = dets:insert(MsgLocationDets, Obj); +dets_ets_insert(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + true = ets:insert(MsgLocationEts, Obj), + ok. + +dets_ets_insert_new(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + true = dets:insert_new(MsgLocationDets, Obj); +dets_ets_insert_new(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + true = ets:insert_new(MsgLocationEts, Obj). + +dets_ets_match_object(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + dets:match_object(MsgLocationDets, Obj); +dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + ets:match_object(MsgLocationEts, Obj). + +%%---------------------------------------------------------------------------- +%% recovery +%%---------------------------------------------------------------------------- + +recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles) -> + lists:foreach(fun (TmpFile) -> + ok = recover_crashed_compactions1( + RefCountFun, Dir, Files, TmpFile) + end, + TmpFiles), + ok. + +recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> + NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, + true = lists:member(NonTmpRelatedFile, Files), + {ok, UncorruptedMessagesTmp, MsgIdsTmp} = + scan_file_for_valid_messages_msg_ids(Dir, TmpFile), + %% all of these messages should be referenced + %% otherwise they wouldn't have been copied out + verify_messages_referenced(RefCountFun, MsgIdsTmp), + {ok, UncorruptedMessages, MsgIds} = + scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile), + %% 1) It's possible that everything in the tmp file is also in the + %% main file such that the main file is (prefix ++ + %% tmpfile). This means that compaction failed immediately + %% prior to the final step of deleting the tmp file. Plan: just + %% delete the tmp file + %% 2) It's possible that everything in the tmp file is also in the + %% main file but with holes throughout (or just somthing like + %% main = (prefix ++ hole ++ tmpfile)). This means that + %% compaction wrote out the tmp file successfully and then + %% failed. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 3) It's possible that everything in the tmp file is also in the + %% main file but such that the main file does not end with tmp + %% file (and there are valid messages in the suffix; main = + %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This + %% means that compaction failed as we were writing out the tmp + %% file. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 4) It's possible that there are messages in the tmp file which + %% are not in the main file. This means that writing out the + %% tmp file succeeded, but then we failed as we were copying + %% them back over to the main file, after truncating the main + %% file. As the main file has already been truncated, it should + %% consist only of valid messages. Plan: Truncate the main file + %% back to before any of the files in the tmp file and copy + %% them over again + TmpPath = form_filename(Dir, TmpFile), + case is_sublist(MsgIdsTmp, MsgIds) of + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file + %% is empty + ok = file:delete(TmpPath); + false -> + %% We're in case 4 above. We only care about the inital + %% msgs in main file that are not in the tmp file. If + %% there are no msgs in the tmp file then we would be in + %% the 'true' branch of this case, so we know the + %% lists:last call is safe. + EldestTmpMsgId = lists:last(MsgIdsTmp), + {MsgIds1, UncorruptedMessages1} + = case lists:splitwith( + fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of + {_MsgIds, []} -> %% no msgs from tmp in main + {MsgIds, UncorruptedMessages}; + {Dropped, [EldestTmpMsgId | Rest]} -> + %% Msgs in Dropped are in tmp, so forget them. + %% *cry*. Lists indexed from 1. + {Rest, lists:sublist(UncorruptedMessages, + 2 + length(Dropped), + length(Rest))} + end, + %% Check that everything in the main file prefix is referenced + verify_messages_referenced(RefCountFun, MsgIds1), + %% The main file prefix should be contiguous + {Top, MsgIds1} = find_contiguous_block_prefix( + lists:reverse(UncorruptedMessages1)), + %% we should have that none of the messages in the prefix + %% are in the tmp file + true = is_disjoint(MsgIds1, MsgIdsTmp), + %% must open with read flag, otherwise will stomp over contents + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFile, + ?WRITE_MODE ++ [read]), + %% Wipe out any rubbish at the end of the file. Remember + %% the head of the list will be the highest entry in the + %% file. + [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + TmpSize = TmpTopOffset + TmpTopTotalSize, + %% Extend the main file as big as necessary in a single + %% move. If we run out of disk space, this truncate could + %% fail, but we still aren't risking losing data + ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), + {ok, TmpHdl} = open_file(Dir, TmpFile, ?READ_MODE), + {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:sync(MainHdl), + ok = file:close(MainHdl), + ok = file:close(TmpHdl), + ok = file:delete(TmpPath), + + {ok, _MainMessages, MsgIdsMain} = + scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile), + %% check that everything in MsgIds1 is in MsgIdsMain + true = is_sublist(MsgIds1, MsgIdsMain), + %% check that everything in MsgIdsTmp is in MsgIdsMain + true = is_sublist(MsgIdsTmp, MsgIdsMain) + end, + ok. + +is_sublist(SmallerL, BiggerL) -> + lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). + +is_disjoint(SmallerL, BiggerL) -> + lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). + +verify_messages_referenced(RefCountFun, MsgIds) -> + lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds). + +scan_file_for_valid_messages_msg_ids(Dir, File) -> + {ok, Messages} = scan_file_for_valid_messages(Dir, File), + {ok, Messages, + [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}. + +scan_file_for_valid_messages(Dir, File) -> + case open_file(Dir, File, ?READ_MODE) of + {ok, Hdl} -> + Valid = rabbit_msg_file:scan(Hdl), + %% if something really bad's happened, the close could fail, + %% but ignore + file:close(Hdl), + Valid; + {error, enoent} -> {ok, []}; + {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}}) + end. + +%% Takes the list in *ascending* order (i.e. eldest message +%% first). This is the opposite of what scan_file_for_valid_messages +%% produces. The list of msgs that is produced is youngest first. +find_contiguous_block_prefix([]) -> {0, []}; +find_contiguous_block_prefix(List) -> + find_contiguous_block_prefix(List, 0, []). + +find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> + {ExpectedOffset, MsgIds}; +find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} + | Tail], ExpectedOffset, MsgIds) -> + ExpectedOffset1 = ExpectedOffset + TotalSize, + find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); +find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> + {ExpectedOffset, MsgIds}. + +load_messages(RefCountFun, [], State) -> + CurrentFile = State #msstate.current_file_name, + load_messages(RefCountFun, undefined, [CurrentFile], State); +load_messages(RefCountFun, Files, State) -> + load_messages(RefCountFun, undefined, Files, State). + +load_messages(_RefCountFun, Left, [], State) -> + Num = list_to_integer(filename:rootname(Left)), + Offset = + case dets_ets_match_object(State, #msg_location { + file = Left, _ = '_' }) of + [] -> 0; + L -> + [ #msg_location { file = Left, + offset = MaxOffset, + total_size = TotalSize} | _ ] = + sort_msg_locations_by_offset(desc, L), + MaxOffset + TotalSize + end, + State #msstate { current_file_num = Num, current_file_name = Left, + current_offset = Offset }; +load_messages(RefCountFun, Left, [File|Files], + State = #msstate { dir = Dir, file_summary = FileSummary }) -> + {ok, Messages} = scan_file_for_valid_messages(Dir, File), + {ValidMessages, ValidTotalSize} = lists:foldl( + fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case RefCountFun(MsgId) of + 0 -> {VMAcc, VTSAcc}; + RefCount -> + true = dets_ets_insert_new( + State, #msg_location { + msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize, + is_persistent = IsPersistent }), + {[Obj | VMAcc], VTSAcc + TotalSize} + end + end, {[], 0}, Messages), + %% foldl reverses lists, find_contiguous_block_prefix needs + %% msgs eldest first, so, ValidMessages is the right way round + {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), + Right = case Files of + [] -> undefined; + [F|_] -> F + end, + true = ets:insert_new(FileSummary, #file_summary { + file = File, valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + left = Left, right = Right }), + load_messages(RefCountFun, File, Files, State). + +%%---------------------------------------------------------------------------- +%% garbage collection / compaction / aggregation +%%---------------------------------------------------------------------------- + +maybe_roll_to_new_file(Offset, + State = #msstate { dir = Dir, + file_size_limit = FileSizeLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_file_num = CurNum, + file_summary = FileSummary + } + ) when Offset >= FileSizeLimit -> + State1 = sync(State), + ok = file:close(CurHdl), + NextNum = CurNum + 1, + NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, + {ok, NextHdl} = open_file(Dir, NextName, ?WRITE_MODE), + true = ets:update_element(FileSummary, CurName, + {#file_summary.right, NextName}), + true = ets:insert_new( + FileSummary, #file_summary { + file = NextName, valid_total_size = 0, contiguous_top = 0, + left = CurName, right = undefined }), + State2 = State1 #msstate { current_file_name = NextName, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0, + last_sync_offset = 0 + }, + compact([CurName], State2); +maybe_roll_to_new_file(_, State) -> + State. + +compact(Files, State) -> + %% smallest number, hence eldest, hence left-most, first + SortedFiles = sort_file_names(Files), + %% foldl reverses, so now youngest/right-most first + RemainingFiles = lists:foldl(fun (File, Acc) -> + delete_empty_files(File, Acc, State) + end, [], SortedFiles), + lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). + +%% At this stage, we simply know that the file has had msgs removed +%% from it. However, we don't know if we need to merge it left (which +%% is what we would prefer), or merge it right. If we merge left, then +%% this file is the source, and the left file is the destination. If +%% we merge right then this file is the destination and the right file +%% is the source. +combine_file(File, State = #msstate { file_summary = FileSummary, + current_file_name = CurName }) -> + %% the file we're looking at may no longer exist as it may have + %% been deleted within the current GC run + case ets:lookup(FileSummary, File) of + [] -> State; + [FSEntry = #file_summary { left = Left, right = Right }] -> + GoRight = + fun() -> + case Right of + undefined -> State; + _ when not (CurName == Right) -> + [FSRight] = ets:lookup(FileSummary, Right), + {_, State1} = adjust_meta_and_combine( + FSEntry, FSRight, State), + State1; + _ -> State + end + end, + case Left of + undefined -> + GoRight(); + _ -> [FSLeft] = ets:lookup(FileSummary, Left), + case adjust_meta_and_combine(FSLeft, FSEntry, State) of + {true, State1} -> State1; + {false, State} -> GoRight() + end + end + end. + +adjust_meta_and_combine( + LeftObj = #file_summary { + file = LeftFile, valid_total_size = LeftValidData, right = RightFile }, + RightObj = #file_summary { + file = RightFile, valid_total_size = RightValidData, left = LeftFile, + right = RightRight }, + State = #msstate { file_size_limit = FileSizeLimit, + file_summary = FileSummary }) -> + TotalValidData = LeftValidData + RightValidData, + if FileSizeLimit >= TotalValidData -> + State1 = combine_files(RightObj, LeftObj, State), + %% this could fail if RightRight is undefined + ets:update_element(FileSummary, RightRight, + {#file_summary.left, LeftFile}), + true = ets:insert(FileSummary, LeftObj #file_summary { + valid_total_size = TotalValidData, + contiguous_top = TotalValidData, + right = RightRight }), + true = ets:delete(FileSummary, RightFile), + {true, State1}; + true -> {false, State} + end. + +sort_msg_locations_by_offset(Dir, List) -> + Comp = case Dir of + asc -> fun erlang:'<'/2; + desc -> fun erlang:'>'/2 + end, + lists:sort(fun (#msg_location { offset = OffA }, + #msg_location { offset = OffB }) -> + Comp(OffA, OffB) + end, List). + +combine_files(#file_summary { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State = #msstate { dir = Dir }) -> + State1 = close_file(Source, close_file(Destination, State)), + {ok, SourceHdl} = open_file(Dir, Source, ?READ_MODE), + {ok, DestinationHdl} = open_file(Dir, Destination, + ?READ_MODE ++ ?WRITE_MODE), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize); + true -> + Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_MODE ++ ?WRITE_MODE), + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, sort_msg_locations_by_offset( + asc, dets_ets_match_object( + State1, #msg_location { + file = Destination, _ = '_' }))), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State1), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and MsgLocationDets has been updated to + %% reflect compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file:sync(DestinationHdl), + ok = file:close(TmpHdl), + ok = file:delete(form_filename(Dir, Tmp)) + end, + SourceWorkList = + sort_msg_locations_by_offset( + asc, dets_ets_match_object(State1, #msg_location { + file = Source, _ = '_' })), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State1), + %% tidy up + ok = file:close(SourceHdl), + ok = file:close(DestinationHdl), + ok = file:delete(form_filename(Dir, Source)), + State1. + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, State) -> + {FinalOffset, BlockStart1, BlockEnd1} = + lists:foldl( + fun (StoreEntry = #msg_location { offset = Offset, + total_size = TotalSize }, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + %% update MsgLocationDets to reflect change of file and offset + ok = dets_ets_insert(State, StoreEntry #msg_location { + file = Destination, + offset = CurOffset }), + NextOffset = CurOffset + TotalSize, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the next + %% msg follows straight on + {NextOffset, BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work for + %% the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(SourceHdl, BlockStart), + {ok, BSize} = + file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + TotalSize} + end + end, {InitOffset, undefined, undefined}, WorkList), + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = file:position(SourceHdl, BlockStart1), + {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), + ok = file:sync(DestinationHdl), + ok. + +close_file(File, State = #msstate { dir = Dir, read_file_handle_cache = HC }) -> + HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, File), HC), + State #msstate { read_file_handle_cache = HC1 }. + +delete_empty_files(File, Acc, + #msstate { dir = Dir, file_summary = FileSummary }) -> + [#file_summary { valid_total_size = ValidData, + left = Left, right = Right }] = + ets:lookup(FileSummary, File), + case ValidData of + %% we should NEVER find the current file in here hence right + %% should always be a file, not undefined + 0 -> + case {Left, Right} of + {undefined, _} when not is_atom(Right) -> + %% the eldest file is empty. + true = ets:update_element( + FileSummary, Right, + {#file_summary.left, undefined}); + {_, _} when not (is_atom(Right)) -> + true = ets:update_element(FileSummary, Right, + {#file_summary.left, Left}), + true = + ets:update_element(FileSummary, Left, + {#file_summary.right, Right}) + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(Dir, File)), + Acc; + _ -> [File|Acc] + end. |