summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-02 03:12:47 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-02 03:12:47 +0100
commitf4c642ab2985c94531e0794658e56336965edf09 (patch)
tree1e296d8976d3efd69de572fce492512f323903fd
parent0aa36d41b30e68582de75436d029ccf9b8a62509 (diff)
downloadrabbitmq-server-f4c642ab2985c94531e0794658e56336965edf09.tar.gz
cosmetics and some minor refactoring
-rw-r--r--src/rabbit_msg_store.erl30
-rw-r--r--src/rabbit_queue_index.erl21
-rw-r--r--src/rabbit_router.erl19
-rw-r--r--src/rabbit_variable_queue.erl65
4 files changed, 64 insertions, 71 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7249e13e..cb4768bd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -637,12 +637,11 @@ handle_call({register_sync_callback, ClientRef, Fun}, _From,
reply(ok, State #msstate { client_ondisk_callback =
dict:store(ClientRef, Fun, CODC) });
-handle_call({client_terminate, #client_msstate { client_ref = CRef }},
- _From,
+handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
- cref_to_guids = dict:erase(CRef, CTG) }).
+ cref_to_guids = dict:erase(CRef, CTG)}).
handle_cast({write, CRef, Guid},
State = #msstate { current_file_handle = CurHdl,
@@ -652,7 +651,7 @@ handle_cast({write, CRef, Guid},
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
- cref_to_guids = CTG}) ->
+ cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
@@ -676,16 +675,15 @@ handle_cast({write, CRef, Guid},
[{#file_summary.valid_total_size, ValidTotalSize1},
{#file_summary.file_size, FileSize + TotalSize}]),
NextOffset = CurOffset + TotalSize,
- noreply(
- maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize,
- cref_to_guids =
- case dict:find(CRef, CODC) of
- {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
- error -> CTG
- end}));
+ CTG1 = case dict:find(CRef, CODC) of
+ {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
+ error -> CTG
+ end,
+ noreply(maybe_roll_to_new_file(
+ NextOffset, State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize,
+ cref_to_guids = CTG1 }));
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
@@ -852,8 +850,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end,
dict:map(fun(CRef, Guids) -> Fun = dict:fetch(CRef, CODC),
- Fun(Guids) end,
- CTG),
+ Fun(Guids)
+ end, CTG),
State2 #msstate { cref_to_guids = dict:new() }.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 88b6e832..ea3a9fbf 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -257,10 +257,12 @@ delete_and_terminate(State) ->
State1.
publish(Guid, SeqId, IsPersistent,
- State = #qistate { unsynced_guids = UnsyncedGuids }) when is_binary(Guid) ->
+ State = #qistate { unsynced_guids = UnsyncedGuids })
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} =
- get_journal_handle(State #qistate { unsynced_guids = [Guid | UnsyncedGuids] }),
+ {JournalHdl, State1} = get_journal_handle(
+ State #qistate {
+ unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -675,6 +677,10 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+ OnSyncFun(UG),
+ State #qistate { unsynced_guids = [] }.
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------
@@ -942,12 +948,3 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
-
-%%----------------------------------------------------------------------------
-%% misc
-%%----------------------------------------------------------------------------
-
-notify_sync(State = #qistate { unsynced_guids = UG,
- on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 707698b0..a1a341a9 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -70,16 +70,15 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery) ->
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, Handled}) of
+deliver(QPids, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
+ {Success, _} = delegate:invoke(
+ QPids, fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
+ end),
+ case check_delivery(Mandatory, Immediate,
+ lists:foldl(fun fold_deliveries/2,
+ {false, []}, Success)) of
{routed, Qs} -> {routed, Qs};
O -> O
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9b8dd23c..a72ec2f7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,10 +34,10 @@
-export([init/5, init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1,
+ requeue/2, len/1, is_empty/1, seqids_to_guids/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, seqids_to_guids/2]).
+ status/1]).
-export([start/1, stop/0]).
@@ -449,7 +449,7 @@ init(QueueName, IsDurable, Recover,
timestamp = Now },
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
- unconfirmed = gb_sets:new()},
+ unconfirmed = gb_sets:new() },
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -681,6 +681,15 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) ->
+ lists:foldl(
+ fun(SeqId, Guids) ->
+ [case dict:fetch(SeqId, PA) of
+ #msg_status { msg = Msg } -> Msg#basic_message.guid;
+ {_, Guid} -> Guid
+ end | Guids]
+ end, [], SeqIds).
+
set_ram_duration_target(DurationTarget,
State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
@@ -776,15 +785,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{avg_egress_rate , AvgEgressRate},
{avg_ingress_rate , AvgIngressRate} ].
-seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) ->
- lists:foldl(
- fun(SeqId, Guids) ->
- [case dict:fetch(SeqId, PA) of
- #msg_status { msg = Msg } -> Msg#basic_message.guid;
- {_, Guid} -> Guid
- end | Guids]
- end, [], SeqIds).
-
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -1182,8 +1182,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
%% the AckTags were removed from State1, so use State in seqids_to_guids
- State2 = msgs_confirmed(gb_sets:from_list(seqids_to_guids(AckTags, State)),
- State1),
+ State2 = remove_confirms(
+ gb_sets:from_list(seqids_to_guids(AckTags, State)), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
State2 #vqstate { index_state = IndexState1,
@@ -1207,13 +1207,16 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
-msgs_confirmed(GuidSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
unconfirmed = gb_sets:difference(UC, GuidSet) }.
+msgs_confirmed(GuidSet, State) ->
+ {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}.
+
msgs_written_to_disk(QPid, Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
QPid,
@@ -1221,14 +1224,12 @@ msgs_written_to_disk(QPid, Guids) ->
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
GuidSet = gb_sets:from_list(Guids),
- ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
- State1 =
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) },
- { msgs_confirmed(ToConfirmMsgs, State1),
- {confirm, gb_sets:to_list(ToConfirmMsgs)} }
+ msgs_confirmed(
+ gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
end)
end).
@@ -1239,14 +1240,12 @@ msg_indices_written_to_disk(QPid, Guids) ->
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
GuidSet = gb_sets:from_list(Guids),
- ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
- State1 =
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) },
- { msgs_confirmed(ToConfirmMsgs, State1),
- {confirm, gb_sets:to_list(ToConfirmMsgs)} }
+ msgs_confirmed(
+ gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
end)
end).