From b155306db41afb224a90bd20f142700c42a97efc Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 17:22:18 +0000 Subject: introduce separate type for msg ids and add some auxiliary types for fun params to a) make their purpose clearer, and b) work around emacs indentation bugs --- include/rabbit_msg_store_index.hrl | 8 ++++---- src/rabbit_amqqueue.erl | 4 ++-- src/rabbit_msg_file.erl | 12 +++++++----- src/rabbit_msg_store.erl | 22 +++++++++++----------- src/rabbit_queue_index.erl | 21 ++++++++++----------- src/rabbit_types.erl | 5 +++-- 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index 289f8f60..2ae5b000 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -29,13 +29,13 @@ -spec(new/1 :: (dir()) -> index_state()). -spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())). -spec(lookup/2 :: - (rabbit_guid:guid(), index_state()) -> ('not_found' | keyvalue())). + (rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue())). -spec(insert/2 :: (keyvalue(), index_state()) -> 'ok'). -spec(update/2 :: (keyvalue(), index_state()) -> 'ok'). --spec(update_fields/3 :: (rabbit_guid:guid(), ({fieldpos(), fieldvalue()} | - [{fieldpos(), fieldvalue()}]), +-spec(update_fields/3 :: (rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} | + [{fieldpos(), fieldvalue()}]), index_state()) -> 'ok'). --spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok'). +-spec(delete/2 :: (rabbit_types:msg_id(), index_state()) -> 'ok'). -spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok'). -spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). -spec(terminate/1 :: (index_state()) -> any()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 46b78c39..bbeff1f7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -141,9 +141,9 @@ fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 55e6ac47..71b4aa6f 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -39,15 +39,17 @@ -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). -type(file_size() :: non_neg_integer()). +-type(message_accumulator(A) :: + fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) -> + A)). --spec(append/3 :: (io_device(), rabbit_guid:guid(), msg()) -> +-spec(append/3 :: (io_device(), rabbit_types:msg_id(), msg()) -> rabbit_types:ok_or_error2(msg_size(), any())). -spec(read/2 :: (io_device(), msg_size()) -> - rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()}, + rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()}, any())). --spec(scan/4 :: (io_device(), file_size(), - fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A), - A) -> {'ok', A, position()}). +-spec(scan/4 :: (io_device(), file_size(), message_accumulator(A), A) -> + {'ok', A, position()}). -endif. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9e65e442..02811da7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -132,30 +132,30 @@ file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), cur_file_cache_ets :: ets:tid()}). --type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), - A}). +-type(msg_ref_delta_gen(A) :: + fun ((A) -> 'finished' | + {rabbit_types:msg_id(), non_neg_integer(), A})). -type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', - startup_fun_state()) -> rabbit_types:ok_pid_or_error()). + {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(), maybe_close_fds_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). --spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). --spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> +-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). --spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). --spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). --spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> - 'ok'). +-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). +-spec(sync/3 :: + ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). -spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76b1136f..7b5aa120 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -187,21 +187,21 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_guids :: [rabbit_guid:guid()] - }). --type(startup_fun_state() :: - {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), - A}). + unsynced_guids :: [rabbit_types:msg_id()] + }). +-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). +-type(walker(A) :: fun ((A) -> 'finished' | + {rabbit_types:msg_id(), non_neg_integer(), A})). -type(shutdown_terms() :: [any()]). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + contains_predicate(), on_sync_fun()) -> + {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_guid:guid(), seq_id(), +-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). @@ -209,14 +209,13 @@ -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), + {[{rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([rabbit_amqqueue:name()]) -> - {[[any()]], startup_fun_state()}). +-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). -spec(add_queue_ttl/0 :: () -> 'ok'). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index ab2300c0..899291f2 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -21,7 +21,7 @@ -ifdef(use_specs). -export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0, - message/0, basic_message/0, + message/0, msg_id/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, message_properties/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, @@ -62,11 +62,12 @@ properties_bin :: binary(), payload_fragments_rev :: [binary()]}). -type(content() :: undecoded_content() | decoded_content()). +-type(msg_id() :: rabbit_guid:guid()). -type(basic_message() :: #basic_message{exchange_name :: rabbit_exchange:name(), routing_keys :: [rabbit_router:routing_key()], content :: content(), - guid :: rabbit_guid:guid(), + guid :: msg_id(), is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: -- cgit v1.2.1 From c0304ad94f0862f6cae9d33dac434144b17ea309 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 17:46:50 +0000 Subject: guid -> msg_id in msg_store and friends --- include/rabbit_msg_store.hrl | 3 +- src/rabbit_msg_file.erl | 31 ++-- src/rabbit_msg_store.erl | 341 +++++++++++++++++++------------------ src/rabbit_msg_store_ets_index.erl | 2 +- 4 files changed, 190 insertions(+), 187 deletions(-) diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index 9d704f65..e9150a97 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -22,5 +22,4 @@ -endif. --record(msg_location, - {guid, ref_count, file, offset, total_size}). +-record(msg_location, {msg_id, ref_count, file, offset, total_size}). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 71b4aa6f..22ad3d05 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -27,8 +27,8 @@ -define(WRITE_OK_SIZE_BITS, 8). -define(WRITE_OK_MARKER, 255). -define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). --define(GUID_SIZE_BYTES, 16). --define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). +-define(MSG_ID_SIZE_BYTES, 16). +-define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)). -define(SCAN_BLOCK_SIZE, 4194304). %% 4MB %%---------------------------------------------------------------------------- @@ -55,14 +55,14 @@ %%---------------------------------------------------------------------------- -append(FileHdl, Guid, MsgBody) - when is_binary(Guid) andalso size(Guid) =:= ?GUID_SIZE_BYTES -> +append(FileHdl, MsgId, MsgBody) + when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES -> MsgBodyBin = term_to_binary(MsgBody), MsgBodyBinSize = size(MsgBodyBin), - Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, + Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; @@ -71,13 +71,13 @@ append(FileHdl, Guid, MsgBody) read(FileHdl, TotalSize) -> Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, - BodyBinSize = Size - ?GUID_SIZE_BYTES, + BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <>} -> - {ok, {Guid, binary_to_term(MsgBodyBin)}}; + {ok, {MsgId, binary_to_term(MsgBodyBin)}}; KO -> KO end. @@ -102,21 +102,22 @@ scanner(<<>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scanner(<>, Offset, Fun, Acc) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of ?WRITE_OK_MARKER -> %% Here we take option 5 from %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in - %% which we read the Guid as a number, and then convert it + %% which we read the MsgId as a number, and then convert it %% back to a binary in order to work around bugs in %% Erlang's GC. - <> = - <>, - <> = <>, + <> = + <>, + <> = + <>, scanner(Rest, Offset + TotalSize, Fun, - Fun({Guid, TotalSize, Offset, Msg}, Acc)); + Fun({MsgId, TotalSize, Offset, Msg}, Acc)); _ -> scanner(Rest, Offset + TotalSize, Fun, Acc) end; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 02811da7..771835a1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -74,7 +74,7 @@ %% to callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - cref_to_guids %% client ref to synced messages mapping + cref_to_msg_ids %% client ref to synced messages mapping }). -record(client_msstate, @@ -135,7 +135,7 @@ -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). +-type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). @@ -143,7 +143,7 @@ (atom(), file:filename(), [binary()] | 'undefined', {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(), +-spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(), maybe_close_fds_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). @@ -177,8 +177,8 @@ %% The components: %% -%% Index: this is a mapping from Guid to #msg_location{}: -%% {Guid, RefCount, File, Offset, TotalSize} +%% Index: this is a mapping from MsgId to #msg_location{}: +%% {MsgId, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: %% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} @@ -279,7 +279,7 @@ %% alternating full files and files with only one tiny message in %% them). %% -%% Messages are reference-counted. When a message with the same guid +%% Messages are reference-counted. When a message with the same msg id %% is written several times we only store it once, and only remove it %% from the store when it has been removed the same number of times. %% @@ -422,29 +422,29 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(Guid, Msg, +write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> - ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - ok = server_cast(CState, {write, CRef, Guid}). + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId}). -read(Guid, +read(MsgId, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache - case fetch_and_increment_cache(DedupCacheEts, Guid) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> %% 2. Check the cur file cache - case ets:lookup(CurFileCacheEts, Guid) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> Defer = fun() -> - {server_call(CState, {read, Guid}), CState} + {server_call(CState, {read, MsgId}), CState} end, - case index_lookup_positive_ref_count(Guid, CState) of + case index_lookup_positive_ref_count(MsgId, CState) of not_found -> Defer(); MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - [{Guid, Msg, _CacheRefCount}] -> + [{MsgId, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the %% refcount, so can't insert into dedup cache {{ok, Msg}, CState} @@ -453,13 +453,13 @@ read(Guid, {{ok, Msg}, CState} end. -contains(Guid, CState) -> server_call(CState, {contains, Guid}). +contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; -remove(Guids, CState = #client_msstate { client_ref = CRef }) -> - server_cast(CState, {remove, CRef, Guids}). +remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, MsgIds}). release([], _CState) -> ok; -release(Guids, CState) -> server_cast(CState, {release, Guids}). -sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). +release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}). +sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). sync(Server) -> gen_server2:cast(Server, sync). @@ -477,11 +477,11 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). -client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, +client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Guid, CState); + read(MsgId, CState); [#file_summary { locked = Locked, right = Right }] -> client_read2(Locked, Right, MsgLocation, Defer, CState) end. @@ -503,7 +503,7 @@ client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% the safest and simplest thing to do. Defer(); client_read2(false, _Right, - MsgLocation = #msg_location { guid = Guid, file = File }, + MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> %% It's entirely possible that everything we're doing from here on @@ -512,9 +512,9 @@ client_read2(false, _Right, safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, fun (_) -> client_read3(MsgLocation, Defer, CState) end, - fun () -> read(Guid, CState) end). + fun () -> read(MsgId, CState) end). -client_read3(#msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -539,7 +539,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Guid, CState); + read(MsgId, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -550,7 +550,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Guid, CState) + catch error:badarg -> read(MsgId, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -563,7 +563,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% us doing the lookup and the +1 on the readers. (Same as %% badarg scenario above, but we don't have a missing file %% - we just have the /wrong/ file). - case index_lookup(Guid, CState) of + case index_lookup(MsgId, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. {ok, CState1} = close_all_indicated(CState), @@ -589,9 +589,9 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. -clear_client(CRef, State = #msstate { cref_to_guids = CTG, +clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> - State #msstate { cref_to_guids = dict:erase(CRef, CTG), + State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), dying_clients = sets:del_element(CRef, DyingClients) }. @@ -666,7 +666,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - cref_to_guids = dict:new() + cref_to_msg_ids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -698,7 +698,7 @@ prioritise_call(Msg, _From, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _Guid} -> 2; + {read, _MsgId} -> 2; _ -> 0 end. @@ -733,12 +733,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); -handle_call({read, Guid}, From, State) -> - State1 = read_message(Guid, From, State), +handle_call({read, MsgId}, From, State) -> + State1 = read_message(MsgId, From, State), noreply(State1); -handle_call({contains, Guid}, From, State) -> - State1 = contains_message(Guid, From, State), +handle_call({contains, MsgId}, From, State) -> + State1 = contains_message(MsgId, From, State), noreply(State1). handle_cast({client_dying, CRef}, @@ -751,53 +751,53 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, Guid}, +handle_cast({write, CRef, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> - true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), - [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), + true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), + [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), noreply( - case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of {write, State1} -> - write_message(CRef, Guid, Msg, State1); + write_message(CRef, MsgId, Msg, State1); {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> State1; {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), State1; {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, Guid, State1); + record_pending_confirm(CRef, MsgId, State1); {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM end, CRef, State1) end); -handle_cast({remove, CRef, Guids}, State) -> +handle_cast({remove, CRef, MsgIds}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, - State, Guids), - noreply(maybe_compact( - client_confirm(CRef, gb_sets:from_list(Guids), removed, State1))); + fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end, + State, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), + removed, State1))); -handle_cast({release, Guids}, State = +handle_cast({release, MsgIds}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> lists:foreach( - fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids), + fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), noreply(State); -handle_cast({sync, Guids, K}, +handle_cast({sync, MsgIds, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, on_sync = Syncs }) -> {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (Guid) -> + case lists:any(fun (MsgId) -> #msg_location { file = File, offset = Offset } = - index_lookup(Guid, State), + index_lookup(MsgId, State), File =:= CurFile andalso Offset >= SyncOffset - end, Guids) of + end, MsgIds) of false -> K(), noreply(State); true -> noreply(State #msstate { on_sync = [K | Syncs] }) @@ -879,16 +879,16 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = Syncs, - cref_to_guids = CTG }) -> - case {Syncs, dict:size(CTG)} of +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_msg_ids = CTM }) -> + case {Syncs, dict:size(CTM)} of {[], 0} -> {State, hibernate}; _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = Syncs, - cref_to_guids = CTG }) -> - case {Syncs, dict:size(CTG)} of +next_state(State = #msstate { on_sync = Syncs, + cref_to_msg_ids = CTM }) -> + case {Syncs, dict:size(CTM)} of {[], 0} -> {stop_sync_timer(State), hibernate}; _ -> {State, 0} end. @@ -905,66 +905,66 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs, - cref_to_guids = CTG }) -> + cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), - CGs = dict:fold(fun (CRef, Guids, NS) -> - case gb_sets:is_empty(Guids) of + CGs = dict:fold(fun (CRef, MsgIds, NS) -> + case gb_sets:is_empty(MsgIds) of true -> NS; - false -> [{CRef, Guids} | NS] + false -> [{CRef, MsgIds} | NS] end - end, [], CTG), + end, [], CTM), case {Syncs, CGs} of {[], []} -> ok; _ -> file_handle_cache:sync(CurHdl) end, [K() || K <- lists:reverse(Syncs)], - [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], - State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs], + State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }. -write_action({true, not_found}, _Guid, State) -> +write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; -write_action({true, #msg_location { file = File }}, _Guid, State) -> +write_action({true, #msg_location { file = File }}, _MsgId, State) -> {ignore, File, State}; -write_action({false, not_found}, _Guid, State) -> +write_action({false, not_found}, _MsgId, State) -> {write, State}; write_action({Mask, #msg_location { ref_count = 0, file = File, total_size = TotalSize }}, - Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), + ok = index_delete(MsgId, State), {write, State}; {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death + %% The msg for MsgId is older than the client death %% message, but as it is being GC'd currently we'll have %% to write a new copy, which will then be younger, so %% ignore this write. {ignore, File, State}; {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(MsgId, 1, State), State1 = adjust_valid_total_size(File, TotalSize, State), {confirm, File, State1} end; write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, - Guid, State) -> - ok = index_update_ref_count(Guid, RefCount + 1, State), + MsgId, State) -> + ok = index_update_ref_count(MsgId, RefCount + 1, State), %% We already know about it, just update counter. Only update %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, Guid, Msg, State) -> - write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). +write_message(CRef, MsgId, Msg, State) -> + write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). -write_message(Guid, Msg, +write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, sum_file_size = SumFileSize, file_summary_ets = FileSummaryEts }) -> {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert( - #msg_location { guid = Guid, ref_count = 1, file = CurFile, + #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { right = undefined, locked = false }] = ets:lookup(FileSummaryEts, CurFile), @@ -976,21 +976,21 @@ write_message(Guid, Msg, sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize }). -read_message(Guid, From, +read_message(MsgId, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> - case index_lookup_positive_ref_count(Guid, State) of + case index_lookup_positive_ref_count(MsgId, State) of not_found -> gen_server2:reply(From, not_found), State; MsgLocation -> - case fetch_and_increment_cache(DedupCacheEts, Guid) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> read_message1(From, MsgLocation, State); Msg -> gen_server2:reply(From, {ok, Msg}), State end end. -read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, +read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -1000,7 +1000,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, case File =:= CurFile of true -> {Msg, State1} = %% can return [] if msg in file existed on startup - case ets:lookup(CurFileCacheEts, Guid) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> {ok, RawOffSet} = file_handle_cache:current_raw_offset(CurHdl), @@ -1009,9 +1009,9 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, false -> ok end, read_from_disk(MsgLoc, State, DedupCacheEts); - [{Guid, Msg1, _CacheRefCount}] -> + [{MsgId, Msg1, _CacheRefCount}] -> ok = maybe_insert_into_cache( - DedupCacheEts, RefCount, Guid, Msg1), + DedupCacheEts, RefCount, MsgId, Msg1), {Msg1, State} end, gen_server2:reply(From, {ok, Msg}), @@ -1019,7 +1019,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, false -> [#file_summary { locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of - true -> add_to_pending_gc_completion({read, Guid, From}, + true -> add_to_pending_gc_completion({read, MsgId, From}, File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), @@ -1028,47 +1028,47 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, end end. -read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, +read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize }, State, DedupCacheEts) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {Guid, Msg}} = + {ok, {MsgId, Msg}} = case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {Guid, _}} = Obj -> + {ok, {MsgId, _}} = Obj -> Obj; Rest -> {error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, - {guid, Guid}, + {msg_id, MsgId}, {read, Rest}, {proc_dict, get()} ]}} end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {Msg, State1}. -contains_message(Guid, From, +contains_message(MsgId, From, State = #msstate { pending_gc_completion = Pending }) -> - case index_lookup_positive_ref_count(Guid, State) of + case index_lookup_positive_ref_count(MsgId, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> case orddict:is_key(File, Pending) of true -> add_to_pending_gc_completion( - {contains, Guid, From}, File, State); + {contains, MsgId, From}, File, State); false -> gen_server2:reply(From, true), State end end. -remove_message(Guid, CRef, +remove_message(MsgId, CRef, State = #msstate { file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> - case should_mask_action(CRef, Guid, State) of + case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; {false_if_increment, #msg_location { ref_count = 0 }} -> @@ -1081,24 +1081,25 @@ remove_message(Guid, CRef, total_size = TotalSize }} when RefCount > 0 -> %% only update field, otherwise bad interaction with %% concurrent GC - Dec = - fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + Dec = fun () -> + index_update_ref_count(MsgId, RefCount - 1, State) + end, case RefCount of %% don't remove from CUR_FILE_CACHE_ETS_NAME here %% because there may be further writes in the mailbox %% for the same msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> add_to_pending_gc_completion( - {remove, Guid, CRef}, File, State); + {remove, MsgId, CRef}, File, State); [#file_summary {}] -> ok = Dec(), delete_file_if_empty( File, adjust_valid_total_size(File, -TotalSize, State)) end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), + _ -> ok = decrement_cache(DedupCacheEts, MsgId), ok = Dec(), State end @@ -1119,12 +1120,12 @@ run_pending(Files, State) -> lists:reverse(orddict:fetch(File, Pending))) end, State, Files). -run_pending_action({read, Guid, From}, State) -> - read_message(Guid, From, State); -run_pending_action({contains, Guid, From}, State) -> - contains_message(Guid, From, State); -run_pending_action({remove, Guid, CRef}, State) -> - remove_message(Guid, CRef, State). +run_pending_action({read, MsgId, From}, State) -> + read_message(MsgId, From, State); +run_pending_action({contains, MsgId, From}, State) -> + contains_message(MsgId, From, State); +run_pending_action({remove, MsgId, CRef}, State) -> + remove_message(MsgId, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1146,44 +1147,46 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, - cref_to_guids = CTG }) -> +update_pending_confirms(Fun, CRef, + State = #msstate { clients = Clients, + cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG), - State #msstate { cref_to_guids = CTG1 } + {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. -record_pending_confirm(CRef, Guid, State) -> +record_pending_confirm(CRef, MsgId, State) -> update_pending_confirms( - fun (_MsgOnDiskFun, CTG) -> - dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end, - gb_sets:singleton(Guid), CTG) + fun (_MsgOnDiskFun, CTM) -> + dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end, + gb_sets:singleton(MsgId), CTM) end, CRef, State). -client_confirm(CRef, Guids, ActionTaken, State) -> +client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(Guids, ActionTaken), - case dict:find(CRef, CTG) of - {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), - case gb_sets:is_empty(Guids1) of - true -> dict:erase(CRef, CTG); - false -> dict:store(CRef, Guids1, CTG) + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(MsgIds, ActionTaken), + case dict:find(CRef, CTM) of + {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds), + case gb_sets:is_empty(MsgIds1) of + true -> dict:erase(CRef, CTM); + false -> dict:store(CRef, MsgIds1, CTM) end; - error -> CTG + error -> CTM end end, CRef, State). -%% Detect whether the Guid is older or younger than the client's death +%% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not %% rewrite the msg - rewriting it would make it younger than the death %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. -should_mask_action(CRef, Guid, +should_mask_action(CRef, MsgId, State = #msstate { dying_clients = DyingClients }) -> - case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of + case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of {false, Location} -> {false, Location}; {true, not_found} -> @@ -1320,43 +1323,43 @@ list_sorted_file_names(Dir, Ext) -> %% message cache helper functions %%---------------------------------------------------------------------------- -maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg) +maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) when RefCount > 1 -> - update_msg_cache(DedupCacheEts, Guid, Msg); -maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) -> + update_msg_cache(DedupCacheEts, MsgId, Msg); +maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> ok. -update_msg_cache(CacheEts, Guid, Msg) -> - case ets:insert_new(CacheEts, {Guid, Msg, 1}) of +update_msg_cache(CacheEts, MsgId, Msg) -> + case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of true -> ok; false -> safe_ets_update_counter_ok( - CacheEts, Guid, {3, +1}, - fun () -> update_msg_cache(CacheEts, Guid, Msg) end) + CacheEts, MsgId, {3, +1}, + fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) end. -remove_cache_entry(DedupCacheEts, Guid) -> - true = ets:delete(DedupCacheEts, Guid), +remove_cache_entry(DedupCacheEts, MsgId) -> + true = ets:delete(DedupCacheEts, MsgId), ok. -fetch_and_increment_cache(DedupCacheEts, Guid) -> - case ets:lookup(DedupCacheEts, Guid) of +fetch_and_increment_cache(DedupCacheEts, MsgId) -> + case ets:lookup(DedupCacheEts, MsgId) of [] -> not_found; - [{_Guid, Msg, _RefCount}] -> + [{_MsgId, Msg, _RefCount}] -> safe_ets_update_counter_ok( - DedupCacheEts, Guid, {3, +1}, + DedupCacheEts, MsgId, {3, +1}, %% someone has deleted us in the meantime, insert us - fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end), + fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end), Msg end. -decrement_cache(DedupCacheEts, Guid) -> +decrement_cache(DedupCacheEts, MsgId) -> true = safe_ets_update_counter( - DedupCacheEts, Guid, {3, -1}, - fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid); + DedupCacheEts, MsgId, {3, -1}, + fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); (_N) -> true end, - %% Guid is not in there because although it's been + %% MsgId is not in there because although it's been %% delivered, it's never actually been read (think: %% persistent message held in RAM) fun () -> true end), @@ -1473,19 +1476,19 @@ count_msg_refs(Gen, Seed, State) -> case Gen(Seed) of finished -> ok; - {_Guid, 0, Next} -> + {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State); - {Guid, Delta, Next} -> - ok = case index_lookup(Guid, State) of + {MsgId, Delta, Next} -> + ok = case index_lookup(MsgId, State) of not_found -> - index_insert(#msg_location { guid = Guid, + index_insert(#msg_location { msg_id = MsgId, file = undefined, ref_count = Delta }, State); #msg_location { ref_count = RefCount } = StoreEntry -> NewRefCount = RefCount + Delta, case NewRefCount of - 0 -> index_delete(Guid, State); + 0 -> index_delete(MsgId, State); _ -> index_update(StoreEntry #msg_location { ref_count = NewRefCount }, State) @@ -1539,8 +1542,8 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) -> - [{Guid, TotalSize, Offset} | Acc]. +scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) -> + [{MsgId, TotalSize, Offset} | Acc]. %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages @@ -1619,8 +1622,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case index_lookup(Guid, State) of + fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case index_lookup(MsgId, State) of #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, @@ -1638,7 +1641,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, %% file size. [] -> {undefined, case ValidMessages of [] -> 0; - _ -> {_Guid, TotalSize, Offset} = + _ -> {_MsgId, TotalSize, Offset} = lists:last(ValidMessages), Offset + TotalSize end}; @@ -1903,8 +1906,8 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir, scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( - fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> - case Index:lookup(Guid, IndexState) of + fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> + case Index:lookup(MsgId, IndexState) of #msg_location { file = File, total_size = TotalSize, offset = Offset, ref_count = 0 } = Entry -> ok = Index:delete_object(Entry, IndexState), @@ -1929,13 +1932,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, end, case lists:foldl( - fun (#msg_location { guid = Guid, offset = Offset, + fun (#msg_location { msg_id = MsgId, offset = Offset, total_size = TotalSize }, {CurOffset, Block = {BlockStart, BlockEnd}}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = Index:update_fields(Guid, + ok = Index:update_fields(MsgId, [{#msg_location.file, Destination}, {#msg_location.offset, CurOffset}], IndexState), @@ -2002,9 +2005,9 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, _Acc, _IgnoreSize} = rabbit_msg_file:scan( RefOld, filelib:file_size(FileOld), - fun({Guid, _Size, _Offset, BinMsg}, ok) -> + fun({MsgId, _Size, _Offset, BinMsg}, ok) -> {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), - {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), + {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), file_handle_cache:close(RefOld), diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 077400d6..d6dc5568 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -31,7 +31,7 @@ new(Dir) -> file:delete(filename:join(Dir, ?FILENAME)), - Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.guid}]), + Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]), #state { table = Tid, dir = Dir }. recover(Dir) -> -- cgit v1.2.1 From 87d9ba2a4387a56f228f6e2ffc54a354b8e6a67d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 17:55:05 +0000 Subject: guid -> msg_id in qi --- src/rabbit_queue_index.erl | 87 +++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7b5aa120..a4984114 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -86,7 +86,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -138,10 +138,10 @@ -define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). -define(NO_EXPIRY, 0). --define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(GUID_BITS, (?GUID_BYTES * 8)). +-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). %% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). +-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -150,7 +150,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} +-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -159,7 +159,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_guids }). + max_journal_entries, on_sync, unsynced_msg_ids }). -record(segment, { num, path, journal_entries, unacked }). @@ -187,7 +187,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_guids :: [rabbit_types:msg_id()] + unsynced_msg_ids :: [rabbit_types:msg_id()] }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -258,22 +258,22 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_guids = UnsyncedGuids }) - when is_binary(Guid) -> - ?GUID_BYTES = size(Guid), +publish(MsgId, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + when is_binary(MsgId) -> + ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( State #qistate { - unsynced_guids = [Guid | UnsyncedGuids] }), + unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]), + create_pub_record_body(MsgId, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). + add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -283,8 +283,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_guids = Guids }) -> - sync_if([] =/= Guids, State). +sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + sync_if([] =/= MsgIds, State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in @@ -387,7 +387,7 @@ blank_state(QueueName) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_guids = [] }. + unsynced_msg_ids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -469,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> - recover_message(ContainsCheckFun(Guid), CleanShutdown, + fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, + Segment1) -> + recover_message(ContainsCheckFun(MsgId), CleanShutdown, Del, RelSeq, Segment1) end, Segment #segment { unacked = UnackedCount + UnackedCountDelta }, @@ -514,17 +515,17 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> ok = gatherer:stop(Gatherer), ok = rabbit_misc:unlink_and_capture_exit(Gatherer), finished; - {value, {Guid, Count}} -> - {Guid, Count, {next, Gatherer}} + {value, {MsgId, Count}} -> + {MsgId, Count, {next, Gatherer}} end. queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> - gatherer:in(Gatherer, {Guid, 1}); + gatherer:in(Gatherer, {MsgId, 1}); (_RelSeq, _Value, Acc) -> Acc end, ok, segment_find_or_new(Seg, Dir, Segments)) || @@ -536,24 +537,24 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> - [Guid, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) -> + [MsgId, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> <>; expiry_to_binary(Expiry) -> <>. read_pub_record_body(Hdl) -> - case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of {ok, Bin} -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 - <> = Bin, - <> = <>, + <> = Bin, + <> = <>, Exp = case Expiry of ?NO_EXPIRY -> undefined; X -> X end, - {Guid, #message_properties{expiry = Exp}}; + {MsgId, #message_properties{expiry = Exp}}; Error -> Error end. @@ -680,8 +681,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case read_pub_record_body(Hdl) of - {Guid, MsgProps} -> - Publish = {Guid, MsgProps, + {MsgId, MsgProps} -> + Publish = {MsgId, MsgProps, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false @@ -715,9 +716,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> +notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), - State #qistate { unsynced_guids = [] }. + State #qistate { unsynced_msg_ids = [] }. %%---------------------------------------------------------------------------- %% segment manipulation @@ -795,12 +796,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, MsgProps, IsPersistent} -> + {MsgId, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<>, - create_pub_record_body(Guid, MsgProps)]) + create_pub_record_body(MsgId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -820,10 +821,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -853,8 +854,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <>} -> - {Guid, MsgProps} = read_pub_record_body(Hdl), - Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, + {MsgId, MsgProps} = read_pub_record_body(Hdl), + Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); @@ -1001,17 +1002,17 @@ add_queue_ttl_journal(<>) -> {<>, Rest}; add_queue_ttl_journal(<>) -> - {[<>, Guid, + MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> + {[<>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_journal(_) -> stop. add_queue_ttl_segment(<>) -> {[<>, Guid, expiry_to_binary(undefined)], Rest}; + RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<>) -> {<>, -- cgit v1.2.1 From ab3668ec2104d35a57efdf828db521ecbb5a0dac Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 18:30:06 +0000 Subject: guid -> msg_id --- src/rabbit_backing_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6a21e10f..03c1fdd1 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,7 +62,7 @@ behaviour_info(callbacks) -> {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 guid per Ack, in the same order as Acks. + %% about. Must return 1 msg_id per Ack, in the same order as Acks. {ack, 2}, %% A publish, but in the context of a transaction. -- cgit v1.2.1 From 5769f3263378c0d6fb48bee884e6f24cc65304b1 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 18:30:25 +0000 Subject: guid -> msg_id in vq except for #basic_message --- src/rabbit_variable_queue.erl | 220 +++++++++++++++++++++--------------------- 1 file changed, 110 insertions(+), 110 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 58a28d32..1d32cec6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -156,7 +156,7 @@ %% segments. %% %% Pending acks are recorded in memory either as the tuple {SeqId, -%% Guid, MsgProps} (tuple-form) or as the message itself (message- +%% MsgId, MsgProps} (tuple-form) or as the message itself (message- %% form). Acks for persistent messages are always stored in the tuple- %% form. Acks for transient messages are also stored in tuple-form if %% the message has been sent to disk as part of the memory reduction @@ -261,7 +261,7 @@ -record(msg_status, { seq_id, - guid, + msg_id, msg, is_persistent, is_delivered, @@ -400,10 +400,10 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids, ActionTaken) -> - msgs_written_to_disk(Self, Guids, ActionTaken) + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(Self, MsgIds, ActionTaken) end, - fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), @@ -432,8 +432,8 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> rabbit_queue_index:recover( QueueName, Terms1, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(Guid, PersistentClient) + fun (MsgId) -> + rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), init(true, IndexState, DeltaCount, Terms1, @@ -509,17 +509,17 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, #basic_message { guid = Guid }, +publish_delivered(false, #basic_message { guid = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0 }) -> case NeedsConfirming of - true -> blind_confirm(self(), gb_sets:singleton(Guid)); + true -> blind_confirm(self(), gb_sets:singleton(MsgId)); false -> ok end, {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - guid = Guid }, + guid = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, @@ -535,7 +535,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, @@ -586,12 +586,12 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> end. read_msg(MsgStatus = #msg_status { msg = undefined, - guid = Guid, + msg_id = MsgId, is_persistent = IsPersistent }, State = #vqstate { ram_msg_count = RamMsgCount, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, MsgId), {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, msg_store_clients = MSCState1 }}; @@ -600,7 +600,7 @@ read_msg(MsgStatus, State) -> internal_fetch(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, - guid = Guid, + msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -619,7 +619,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 2. Remove from msg_store and queue index, if necessary Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = @@ -678,7 +678,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable, #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, + persistent_msg_ids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. @@ -689,13 +690,13 @@ tx_commit(Txn, Fun, MsgPropsFun, #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(Pubs), - HasPersistentPubs = PersistentGuids =/= [], + PersistentMsgIds = persistent_msg_ids(Pubs), + HasPersistentPubs = PersistentMsgIds =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of true -> ok = msg_store_sync( - MSCState, true, PersistentGuids, - msg_store_callback(PersistentGuids, Pubs, AckTags1, + MSCState, true, PersistentMsgIds, + msg_store_callback(PersistentMsgIds, Pubs, AckTags1, Fun, MsgPropsFun)), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, @@ -713,10 +714,10 @@ requeue(AckTags, MsgPropsFun, State) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), State2; - ({IsPersistent, Guid, MsgProps}, State1) -> + ({IsPersistent, MsgId, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, MsgId), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps), true, true, State2), @@ -905,12 +906,12 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a guid to the unconfirmed set +%% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = MsgId }, MsgProps) -> - #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, + #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. @@ -937,30 +938,30 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> MsgStore, Ref, MsgOnDiskFun, msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). -msg_store_write(MSCState, IsPersistent, Guid, Msg) -> +msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). -msg_store_read(MSCState, IsPersistent, Guid) -> +msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). -msg_store_remove(MSCState, IsPersistent, Guids) -> +msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). + fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_release(MSCState, IsPersistent, Guids) -> +msg_store_release(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). + fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end). -msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> +msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( @@ -994,21 +995,21 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). -persistent_guids(Pubs) -> - [Guid || {#basic_message { guid = Guid, - is_persistent = true }, _MsgProps} <- Pubs]. +persistent_msg_ids(Pubs) -> + [MsgId || {#basic_message { guid = MsgId, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, + fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; false -> {[m(#msg_status { msg = undefined, - guid = Guid, + msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -1114,7 +1115,7 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> +msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> {[], tx_commit_post_msg_store( @@ -1124,14 +1125,14 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( fun () -> remove_persistent_messages( - PersistentGuids) + PersistentMsgIds) end, F) end) end. -remove_persistent_messages(Guids) -> +remove_persistent_messages(MsgIds) -> PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), - ok = rabbit_msg_store:remove(Guids, PersistentClient), + ok = rabbit_msg_store:remove(MsgIds, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, @@ -1149,7 +1150,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid, _MsgProps} -> + {IsPersistent, _MsgId, _MsgProps} -> IsPersistent end]; false -> [] @@ -1215,38 +1216,38 @@ purge_betas_and_deltas(LensByStore, end. remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {GuidsByStore, Delivers, Acks} = + {MsgIdsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), - {sum_guids_by_store_to_len(LensByStore, GuidsByStore), + ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> + msg_store_remove(MSCState, IsPersistent, MsgIds) + end, ok, MsgIdsByStore), + {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { guid = Guid, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {GuidsByStore, Delivers, Acks}) -> + {MsgIdsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); - false -> GuidsByStore + true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + false -> MsgIdsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> +sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> orddict:fold( - fun (IsPersistent, Guids, LensByStore1) -> - orddict:update_counter(IsPersistent, length(Guids), LensByStore1) - end, LensByStore, GuidsByStore). + fun (IsPersistent, MsgIds, LensByStore1) -> + orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) + end, LensByStore, MsgIdsByStore). %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, @@ -1266,7 +1267,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, @@ -1278,14 +1279,14 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, guid = Guid, + msg = Msg, msg_id = MsgId, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), MsgStatus #msg_status { msg_on_disk = true }; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. @@ -1295,7 +1296,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, + msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -1303,7 +1304,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish( - Guid, SeqId, MsgProps, IsPersistent, IndexState), + MsgId, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1322,7 +1323,7 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %%---------------------------------------------------------------------------- record_pending_ack(#msg_status { seq_id = SeqId, - guid = Guid, + msg_id = MsgId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, msg_props = MsgProps } = MsgStatus, @@ -1331,8 +1332,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> {{IsPersistent, Guid, MsgProps}, RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + true -> {{IsPersistent, MsgId, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), State #vqstate { pending_ack = PA1, @@ -1343,28 +1344,28 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore} = + {PersistentSeqIds, MsgIdsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of - true -> case orddict:find(false, GuidsByStore) of - error -> State1; - {ok, Guids} -> ok = msg_store_remove(MSCState, false, - Guids), + true -> case orddict:find(false, MsgIdsByStore) of + error -> State1; + {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, + MsgIds), State1 end; false -> IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), - [ok = msg_store_remove(MSCState, IsPersistent, Guids) - || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore}, + {{PersistentSeqIds, MsgIdsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1380,10 +1381,10 @@ ack(MsgStoreFun, Fun, AckTags, State) -> gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), - [ok = MsgStoreFun(MSCState, IsPersistent, Guids) - || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], - PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( - orddict:new(), GuidsByStore)), + [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], + PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( + orddict:new(), MsgIdsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }. @@ -1393,12 +1394,12 @@ accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, - {PersistentSeqIdsAcc, GuidsByStore}) -> - {PersistentSeqIdsAcc, GuidsByStore}; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore}; +accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, + {PersistentSeqIdsAcc, MsgIdsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1417,12 +1418,12 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, +remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), - msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), - unconfirmed = gb_sets:difference(UC, GuidSet) }. + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), + msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), + unconfirmed = gb_sets:difference(UC, MsgIdSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1439,37 +1440,37 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(GuidSet, State) -> - {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. +msgs_confirmed(MsgIdSet, State) -> + {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}. -blind_confirm(QPid, GuidSet) -> +blind_confirm(QPid, MsgIdSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). + QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end). -msgs_written_to_disk(QPid, GuidSet, removed) -> - blind_confirm(QPid, GuidSet); -msgs_written_to_disk(QPid, GuidSet, written) -> +msgs_written_to_disk(QPid, MsgIdSet, removed) -> + blind_confirm(QPid, MsgIdSet); +msgs_written_to_disk(QPid, MsgIdSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + Written = gb_sets:intersection(UC, MsgIdSet), + msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD), State #vqstate { msgs_on_disk = - gb_sets:union( - MOD, gb_sets:intersection(UC, GuidSet)) }) + gb_sets:union(MOD, Written) }) end). -msg_indices_written_to_disk(QPid, GuidSet) -> +msg_indices_written_to_disk(QPid, MsgIdSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + Written = gb_sets:intersection(UC, MsgIdSet), + msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD), State #vqstate { msg_indices_on_disk = - gb_sets:union( - MIOD, gb_sets:intersection(UC, GuidSet)) }) + gb_sets:union(MIOD, Written) }) end). %%---------------------------------------------------------------------------- @@ -1547,17 +1548,16 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, true -> {Quota, State}; false -> - {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { - guid = Guid, %% ASSERTION + msg_id = MsgId, %% ASSERTION is_persistent = false, %% ASSERTION msg_props = MsgProps } = dict:fetch(SeqId, PA), {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA), limit_ram_acks(Quota - 1, - State1 #vqstate { - pending_ack = - dict:store(SeqId, {false, Guid, MsgProps}, PA), - ram_ack_index = RAI1 }) + State1 #vqstate { pending_ack = PA1, + ram_ack_index = RAI1 }) end. @@ -1818,9 +1818,9 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> multiple_routing_keys() -> transform_storage( fun ({basic_message, ExchangeName, Routing_Key, Content, - Guid, Persistent}) -> + MsgId, Persistent}) -> {ok, {basic_message, ExchangeName, [Routing_Key], Content, - Guid, Persistent}}; + MsgId, Persistent}}; (_) -> {error, corrupt_message} end), ok. -- cgit v1.2.1 From 8569560c351598e90c38b2a794b1d46b96347b76 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 18:36:49 +0000 Subject: #basic_message.guid -> id --- include/rabbit.hrl | 2 +- src/rabbit_amqqueue_process.erl | 2 +- src/rabbit_basic.erl | 12 ++++++------ src/rabbit_types.erl | 2 +- src/rabbit_variable_queue.erl | 10 +++++----- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4d75b546..9f483c30 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,7 @@ -record(listener, {node, protocol, host, ip_address, port}). --record(basic_message, {exchange_name, routing_keys = [], content, guid, +-record(basic_message, {exchange_name, routing_keys = [], content, id, is_persistent}). -record(ssl_socket, {tcp, ssl}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 44053593..57426e13 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -433,7 +433,7 @@ record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, - guid = Guid}}, + id = Guid}}, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 57aad808..43230f30 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -116,12 +116,12 @@ message(ExchangeName, RoutingKey, #content{properties = Props} = DecodedContent) -> try {ok, #basic_message{ - exchange_name = ExchangeName, - content = strip_header(DecodedContent, ?DELETED_HEADER), - guid = rabbit_guid:guid(), - is_persistent = is_message_persistent(DecodedContent), - routing_keys = [RoutingKey | - header_routes(Props#'P_basic'.headers)]}} + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + id = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} catch {error, _Reason} = Error -> Error end. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 899291f2..90dfd38d 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -67,7 +67,7 @@ #basic_message{exchange_name :: rabbit_exchange:name(), routing_keys :: [rabbit_router:routing_key()], content :: content(), - guid :: msg_id(), + id :: msg_id(), is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1d32cec6..0c4c06e8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -509,7 +509,7 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, #basic_message { guid = MsgId }, +publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0 }) -> @@ -519,7 +519,7 @@ publish_delivered(false, #basic_message { guid = MsgId }, end, {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }, + id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, @@ -909,7 +909,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; %% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = MsgId }, +msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, MsgProps) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = false, @@ -996,7 +996,7 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_msg_ids(Pubs) -> - [MsgId || {#basic_message { guid = MsgId, + [MsgId || {#basic_message { id = MsgId, is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> @@ -1247,7 +1247,7 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, +publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, -- cgit v1.2.1 From 1bd39c0325baec4014cb05654f2be02f8843fdc8 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 18:46:21 +0000 Subject: guid -> msg_id in amqqueue_process --- src/rabbit_amqqueue_process.erl | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 57426e13..650b6a68 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -46,7 +46,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - guid_to_channel, + msg_id_to_channel, ttl, ttl_timer_ref }). @@ -112,7 +112,7 @@ init(Q) -> expiry_timer_ref = undefined, ttl = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()}, hibernate, + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -404,22 +404,22 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = +confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> + {CMs, MTC1} = lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of + fun(MsgId, {CMs, MTC0}) -> + case dict:find(MsgId, MTC0) of {ok, {ChPid, MsgSeqNo}} -> {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; + dict:erase(MsgId, MTC0)}; _ -> - {CMs, GTC0} + {CMs, MTC0} end - end, {gb_trees:empty(), GTC}, Guids), + end, {gb_trees:empty(), MTC}, MsgIds), gb_trees:map(fun(ChPid, MsgSeqNos) -> rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), - State#q{guid_to_channel = GTC1}. + State#q{msg_id_to_channel = MTC1}. gb_trees_cons(Key, Value, Tree) -> case gb_trees:lookup(Key, Tree) of @@ -433,12 +433,12 @@ record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, - id = Guid}}, + id = MsgId}}, State = - #q{guid_to_channel = GTC, - q = #amqqueue{durable = true}}) -> + #q{msg_id_to_channel = MTC, + q = #amqqueue{durable = true}}) -> {confirm, - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; + State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; record_confirm_message(_Delivery, State) -> {no_confirm, State}. @@ -618,9 +618,9 @@ backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {Guids, BQS1} = Fun(BQS), + {MsgIds, BQS1} = Fun(BQS), run_message_queue( - confirm_messages(Guids, State#q{backing_queue_state = BQS1})). + confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -767,8 +767,8 @@ prioritise_cast(Msg, _State) -> maybe_expire -> 8; drop_expired -> 8; emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {maybe_run_queue_via_backing_queue, _Fun} -> 6; -- cgit v1.2.1 From 1076e2220865be678888d3ec1fd2799bdb55da60 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 18:58:26 +0000 Subject: guid -> msg_id in tests --- src/rabbit_tests.erl | 200 +++++++++++++++++++++++++-------------------------- 1 file changed, 100 insertions(+), 100 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0c6250df..2def7573 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1602,50 +1602,50 @@ restart_msg_store_empty() -> ok = rabbit_variable_queue:start_msg_store( undefined, {fun (ok) -> finished end, ok}). -guid_bin(X) -> +msg_id_bin(X) -> erlang:md5(term_to_binary(X)). msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). -msg_store_contains(Atom, Guids, MSCState) -> +msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( - fun (Guid, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(Guid, MSCState) end, - Atom, Guids). + fun (MsgId, Atom1) when Atom1 =:= Atom -> + rabbit_msg_store:contains(MsgId, MSCState) end, + Atom, MsgIds). -msg_store_sync(Guids, MSCState) -> +msg_store_sync(MsgIds, MSCState) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end, + ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end, MSCState), receive {sync, Ref} -> ok after 10000 -> - io:format("Sync from msg_store missing for guids ~p~n", [Guids]), + io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), throw(timeout) end. -msg_store_read(Guids, MSCState) -> - lists:foldl(fun (Guid, MSCStateM) -> - {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( - Guid, MSCStateM), +msg_store_read(MsgIds, MSCState) -> + lists:foldl(fun (MsgId, MSCStateM) -> + {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( + MsgId, MSCStateM), MSCStateN - end, MSCState, Guids). + end, MSCState, MsgIds). -msg_store_write(Guids, MSCState) -> - ok = lists:foldl( - fun (Guid, ok) -> rabbit_msg_store:write(Guid, Guid, MSCState) end, - ok, Guids). +msg_store_write(MsgIds, MSCState) -> + ok = lists:foldl(fun (MsgId, ok) -> + rabbit_msg_store:write(MsgId, MsgId, MSCState) + end, ok, MsgIds). -msg_store_remove(Guids, MSCState) -> - rabbit_msg_store:remove(Guids, MSCState). +msg_store_remove(MsgIds, MSCState) -> + rabbit_msg_store:remove(MsgIds, MSCState). -msg_store_remove(MsgStore, Ref, Guids) -> +msg_store_remove(MsgStore, Ref, MsgIds) -> with_msg_store_client(MsgStore, Ref, fun (MSCStateM) -> - ok = msg_store_remove(Guids, MSCStateM), + ok = msg_store_remove(MsgIds, MSCStateM), MSCStateM end). @@ -1655,140 +1655,140 @@ with_msg_store_client(MsgStore, Ref, Fun) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end, msg_store_client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), Self = self(), - Guids = [guid_bin(M) || M <- lists:seq(1,100)], - {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], + {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), Ref = rabbit_guid:guid(), MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, Guids, MSCState), + false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half - ok = msg_store_write(Guids1stHalf, MSCState), + ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(Guids1stHalf, MSCState), + ok = msg_store_sync(MsgIds1stHalf, MSCState), %% publish the second half - ok = msg_store_write(Guids2ndHalf, MSCState), + ok = msg_store_write(MsgIds2ndHalf, MSCState), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(Guids1stHalf, MSCState), + ok = msg_store_sync(MsgIds1stHalf, MSCState), %% check they're all in there - true = msg_store_contains(true, Guids, MSCState), + true = msg_store_contains(true, MsgIds, MSCState), %% publish the latter half twice so we hit the caching and ref count code - ok = msg_store_write(Guids2ndHalf, MSCState), + ok = msg_store_write(MsgIds2ndHalf, MSCState), %% check they're still all in there - true = msg_store_contains(true, Guids, MSCState), + true = msg_store_contains(true, MsgIds, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( - fun (Guid, ok) -> rabbit_msg_store:sync( - [Guid], fun () -> Self ! {sync, Guid} end, - MSCState) - end, ok, Guids2ndHalf), + fun (MsgId, ok) -> rabbit_msg_store:sync( + [MsgId], fun () -> Self ! {sync, MsgId} end, + MSCState) + end, ok, MsgIds2ndHalf), lists:foldl( - fun(Guid, ok) -> + fun(MsgId, ok) -> receive - {sync, Guid} -> ok + {sync, MsgId} -> ok after 10000 -> - io:format("Sync from msg_store missing (guid: ~p)~n", - [Guid]), + io:format("Sync from msg_store missing (msg_id: ~p)~n", + [MsgId]), throw(timeout) end - end, ok, Guids2ndHalf), + end, ok, MsgIds2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(Guids1stHalf, MSCState), + ok = msg_store_sync(MsgIds1stHalf, MSCState), %% read them all - MSCState1 = msg_store_read(Guids, MSCState), + MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk - MSCState2 = msg_store_read(Guids, MSCState1), + MSCState2 = msg_store_read(MsgIds, MSCState1), %% remove them all - ok = rabbit_msg_store:remove(Guids, MSCState2), + ok = rabbit_msg_store:remove(MsgIds, MSCState2), %% check first half doesn't exist - false = msg_store_contains(false, Guids1stHalf, MSCState2), + false = msg_store_contains(false, MsgIds1stHalf, MSCState2), %% check second half does exist - true = msg_store_contains(true, Guids2ndHalf, MSCState2), + true = msg_store_contains(true, MsgIds2ndHalf, MSCState2), %% read the second half again - MSCState3 = msg_store_read(Guids2ndHalf, MSCState2), + MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(Guids2ndHalf, MSCState3), + ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) - MSCState4 = msg_store_read(Guids2ndHalf, MSCState3), + MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( [], {fun ([]) -> finished; - ([Guid|GuidsTail]) - when length(GuidsTail) rem 2 == 0 -> - {Guid, 1, GuidsTail}; - ([Guid|GuidsTail]) -> - {Guid, 0, GuidsTail} - end, Guids2ndHalf}), + ([MsgId|MsgIdsTail]) + when length(MsgIdsTail) rem 2 == 0 -> + {MsgId, 1, MsgIdsTail}; + ([MsgId|MsgIdsTail]) -> + {MsgId, 0, MsgIdsTail} + end, MsgIds2ndHalf}), MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( - fun (Guid, Bool) -> - not(Bool = rabbit_msg_store:contains(Guid, MSCState5)) - end, false, Guids2ndHalf), + fun (MsgId, Bool) -> + not(Bool = rabbit_msg_store:contains(MsgId, MSCState5)) + end, false, MsgIds2ndHalf), ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs - false = msg_store_contains(false, Guids, MSCState6), + false = msg_store_contains(false, MsgIds, MSCState6), %% publish the first half again - ok = msg_store_write(Guids1stHalf, MSCState6), + ok = msg_store_write(MsgIds1stHalf, MSCState6), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState6)), + msg_store_read(MsgIds1stHalf, MSCState6)), MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), + ok = rabbit_msg_store:remove(MsgIds1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty - restart_msg_store_empty(), %% now safe to reuse guids + restart_msg_store_empty(), %% now safe to reuse msg_ids %% push a lot of msgs in... at least 100 files worth {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), PayloadSizeBits = 65536, BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), - GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], + MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:PayloadSizeBits >>, ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, fun (MSCStateM) -> - [ok = rabbit_msg_store:write(Guid, Payload, MSCStateM) || - Guid <- GuidsBig], + [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) || + MsgId <- MsgIdsBig], MSCStateM end), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MSCStateM) -> + fun (MsgId, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - Guid, MSCStateM), + MsgId, MSCStateM), MSCStateN - end, GuidsBig), + end, MsgIdsBig), %% .., then 3s by 1... ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), %% .., then remove 3s by 3, from the young end first. This hits %% GC... ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), %% ensure empty ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, fun (MSCStateM) -> - false = msg_store_contains(false, GuidsBig, MSCStateM), + false = msg_store_contains(false, MsgIdsBig, MSCStateM), MSCStateM end), %% restart empty @@ -1808,8 +1808,8 @@ init_test_queue() -> PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( TestQueue, Terms, false, - fun (Guid) -> - rabbit_msg_store:contains(Guid, PersistentClient) + fun (MsgId) -> + rabbit_msg_store:contains(MsgId, PersistentClient) end, fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), @@ -1840,25 +1840,25 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = msg_store_client_init(MsgStore, Ref), - {A, B = [{_SeqId, LastGuidWritten} | _]} = + {A, B = [{_SeqId, LastMsgIdWritten} | _]} = lists:foldl( - fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> - Guid = rabbit_guid:guid(), + fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> + MsgId = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, #message_properties{}, Persistent, QiN), - ok = rabbit_msg_store:write(Guid, Guid, MSCState), - {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} + MsgId, SeqId, #message_properties{}, Persistent, QiN), + ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), + {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), %% do this just to force all of the publishes through to the msg_store: - true = rabbit_msg_store:contains(LastGuidWritten, MSCState), + true = rabbit_msg_store:contains(LastMsgIdWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, _Props, Persistent, Delivered}|Read], - [{SeqId, Guid}|Published]) -> + [{MsgId, SeqId, _Props, Persistent, Delivered}|Read], + [{SeqId, MsgId}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. @@ -1866,10 +1866,10 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - Guid = rabbit_guid:guid(), + MsgId = rabbit_guid:guid(), Props = #message_properties{expiry=12345}, - Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = + Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), + {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -1891,19 +1891,19 @@ test_queue_index() -> with_empty_test_queue( fun (Qi0) -> {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), - {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), + {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, - lists:reverse(SeqIdsGuidsA)), + lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient {0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), - {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), + {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), ok = verify_read_with_published(false, true, ReadB, - lists:reverse(SeqIdsGuidsB)), + lists:reverse(SeqIdsMsgIdsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), {LenB, Qi12} = restart_test_queue(Qi10), @@ -1911,7 +1911,7 @@ test_queue_index() -> Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, - lists:reverse(SeqIdsGuidsB)), + lists:reverse(SeqIdsMsgIdsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), Qi17 = rabbit_queue_index:flush(Qi16), %% Everything will have gone now because #pubs == #acks @@ -1927,12 +1927,12 @@ test_queue_index() -> %% a) partial pub+del+ack, then move to new segment with_empty_test_queue( fun (Qi0) -> - {Qi1, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, + {Qi1, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi0), Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2), Qi4 = rabbit_queue_index:flush(Qi3), - {Qi5, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], + {Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], false, Qi4), Qi5 end), @@ -1940,10 +1940,10 @@ test_queue_index() -> %% b) partial pub+del, then move to new segment, then ack all in old segment with_empty_test_queue( fun (Qi0) -> - {Qi1, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, + {Qi1, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi0), Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), - {Qi3, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], + {Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi2), Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3), rabbit_queue_index:flush(Qi4) @@ -1952,7 +1952,7 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks with_empty_test_queue( fun (Qi0) -> - {Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, + {Qi1, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi0), Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1), Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2), @@ -1986,12 +1986,12 @@ test_queue_index() -> %% exercise journal_minus_segment, not segment_plus_journal. with_empty_test_queue( fun (Qi0) -> - {Qi1, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7], + {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7], true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), {5, Qi4} = restart_test_queue(Qi3), - {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), + {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), {5, Qi8} = restart_test_queue(Qi7), -- cgit v1.2.1 From 0e40c5131cf79c123b9eb85100bedebaa218df45 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Mar 2011 20:42:51 +0000 Subject: cosmetic --- src/rabbit_msg_store.erl | 2 +- src/rabbit_queue_index.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 48fce9ed..4f5d2411 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -150,7 +150,7 @@ -spec(client_ref/1 :: (client_msstate()) -> client_ref()). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). -spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 59d87654..8227e4cd 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -214,7 +214,7 @@ boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}). + {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). -spec(add_queue_ttl/0 :: () -> 'ok'). -- cgit v1.2.1