diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-02 03:12:47 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-02 03:12:47 +0100 |
commit | f4c642ab2985c94531e0794658e56336965edf09 (patch) | |
tree | 1e296d8976d3efd69de572fce492512f323903fd | |
parent | 0aa36d41b30e68582de75436d029ccf9b8a62509 (diff) | |
download | rabbitmq-server-f4c642ab2985c94531e0794658e56336965edf09.tar.gz |
cosmetics and some minor refactoring
-rw-r--r-- | src/rabbit_msg_store.erl | 30 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 21 | ||||
-rw-r--r-- | src/rabbit_router.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 65 |
4 files changed, 64 insertions, 71 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7249e13e..cb4768bd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -637,12 +637,11 @@ handle_call({register_sync_callback, ClientRef, Fun}, _From, reply(ok, State #msstate { client_ondisk_callback = dict:store(ClientRef, Fun, CODC) }); -handle_call({client_terminate, #client_msstate { client_ref = CRef }}, - _From, +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), - cref_to_guids = dict:erase(CRef, CTG) }). + cref_to_guids = dict:erase(CRef, CTG)}). handle_cast({write, CRef, Guid}, State = #msstate { current_file_handle = CurHdl, @@ -652,7 +651,7 @@ handle_cast({write, CRef, Guid}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, client_ondisk_callback = CODC, - cref_to_guids = CTG}) -> + cref_to_guids = CTG }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), @@ -676,16 +675,15 @@ handle_cast({write, CRef, Guid}, [{#file_summary.valid_total_size, ValidTotalSize1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize, - cref_to_guids = - case dict:find(CRef, CODC) of - {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); - error -> CTG - end})); + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + error -> CTG + end, + noreply(maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize, + cref_to_guids = CTG1 })); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC @@ -852,8 +850,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end, dict:map(fun(CRef, Guids) -> Fun = dict:fetch(CRef, CODC), - Fun(Guids) end, - CTG), + Fun(Guids) + end, CTG), State2 #msstate { cref_to_guids = dict:new() }. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 88b6e832..ea3a9fbf 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -257,10 +257,12 @@ delete_and_terminate(State) -> State1. publish(Guid, SeqId, IsPersistent, - State = #qistate { unsynced_guids = UnsyncedGuids }) when is_binary(Guid) -> + State = #qistate { unsynced_guids = UnsyncedGuids }) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = - get_journal_handle(State #qistate { unsynced_guids = [Guid | UnsyncedGuids] }), + {JournalHdl, State1} = get_journal_handle( + State #qistate { + unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -675,6 +677,10 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> + OnSyncFun(UG), + State #qistate { unsynced_guids = [] }. + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- @@ -942,12 +948,3 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> {undefined, -1}. - -%%---------------------------------------------------------------------------- -%% misc -%%---------------------------------------------------------------------------- - -notify_sync(State = #qistate { unsynced_guids = UG, - on_sync = OnSyncFun }) -> - OnSyncFun(UG), - State #qistate { unsynced_guids = [] }. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 707698b0..a1a341a9 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -70,16 +70,15 @@ deliver(QPids, Delivery = #delivery{mandatory = false, QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), - {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}) of +deliver(QPids, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> + {Success, _} = delegate:invoke( + QPids, fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), + case check_delivery(Mandatory, Immediate, + lists:foldl(fun fold_deliveries/2, + {false, []}, Success)) of {routed, Qs} -> {routed, Qs}; O -> O end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9b8dd23c..a72ec2f7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,10 +34,10 @@ -export([init/5, init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + requeue/2, len/1, is_empty/1, seqids_to_guids/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, seqids_to_guids/2]). + status/1]). -export([start/1, stop/0]). @@ -449,7 +449,7 @@ init(QueueName, IsDurable, Recover, timestamp = Now }, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new()}, + unconfirmed = gb_sets:new() }, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -681,6 +681,15 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> + lists:foldl( + fun(SeqId, Guids) -> + [case dict:fetch(SeqId, PA) of + #msg_status { msg = Msg } -> Msg#basic_message.guid; + {_, Guid} -> Guid + end | Guids] + end, [], SeqIds). + set_ram_duration_target(DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, @@ -776,15 +785,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {avg_egress_rate , AvgEgressRate}, {avg_ingress_rate , AvgIngressRate} ]. -seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> - lists:foldl( - fun(SeqId, Guids) -> - [case dict:fetch(SeqId, PA) of - #msg_status { msg = Msg } -> Msg#basic_message.guid; - {_, Guid} -> Guid - end | Guids] - end, [], SeqIds). - %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -1182,8 +1182,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), %% the AckTags were removed from State1, so use State in seqids_to_guids - State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State)), - State1), + State2 = remove_confirms( + gb_sets:from_list(seqids_to_guids(AckTags, State)), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), State2 #vqstate { index_state = IndexState1, @@ -1207,13 +1207,16 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- -msgs_confirmed(GuidSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> +remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), unconfirmed = gb_sets:difference(UC, GuidSet) }. +msgs_confirmed(GuidSet, State) -> + {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}. + msgs_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( QPid, @@ -1221,14 +1224,12 @@ msgs_written_to_disk(QPid, Guids) -> msg_indices_on_disk = MIOD, unconfirmed = UC }) -> GuidSet = gb_sets:from_list(Guids), - ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), - State1 = - State #vqstate { - msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }, - { msgs_confirmed(ToConfirmMsgs, State1), - {confirm, gb_sets:to_list(ToConfirmMsgs)} } + msgs_confirmed( + gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) end) end). @@ -1239,14 +1240,12 @@ msg_indices_written_to_disk(QPid, Guids) -> msg_indices_on_disk = MIOD, unconfirmed = UC }) -> GuidSet = gb_sets:from_list(Guids), - ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), - State1 = - State #vqstate { - msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }, - { msgs_confirmed(ToConfirmMsgs, State1), - {confirm, gb_sets:to_list(ToConfirmMsgs)} } + msgs_confirmed( + gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) end) end). |