diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 20:42:42 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 20:42:42 +0100 |
commit | 6728492cb87c0193802522048b4365a8b24f4953 (patch) | |
tree | d1191e43cfb3a47bc320c9113db22120de5bee67 | |
parent | 1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5 (diff) | |
parent | 0835a8e307cc73a2bd9e789cb6d55d1c58b734ed (diff) | |
download | rabbitmq-server-6728492cb87c0193802522048b4365a8b24f4953.tar.gz |
Merge default into bug24130. Also add appropriate specs, and fix some bugs
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 37 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 27 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 217 |
5 files changed, 173 insertions, 121 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c1fa048d..e388ccf2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -220,7 +220,7 @@ terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), case BQS of - undefined -> State; + undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), BQS1 = lists:foldl( fun (#cr{txn = none}, BQSN) -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index b70761ea..550423d2 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -38,6 +38,17 @@ -define(ONE_SECOND, 1000). +-ifdef(use_specs). + +-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:length_fun()) -> + rabbit_types:ok_pid_or_error()). +-spec(get_gm/1 :: (pid()) -> pid()). +-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). + +-endif. + %%---------------------------------------------------------------------------- %% %% Mirror Queues diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 7d2b7e44..9578026e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -43,6 +43,30 @@ known_senders }). +-ifdef(use_specs). + +-export_type([death_fun/0, length_fun/0]). + +-type(death_fun() :: fun ((pid()) -> 'ok')). +-type(length_fun() :: fun (() -> 'ok')). +-type(master_state() :: #state { gm :: pid(), + coordinator :: pid(), + backing_queue :: atom(), + backing_queue_state :: any(), + set_delivered :: non_neg_integer(), + seen_status :: dict(), + confirmed :: [rabbit_guid:guid()], + ack_msg_id :: dict(), + known_senders :: set() + }). + +-spec(promote_backing_queue_state/6 :: + (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(sender_death_fun/0 :: () -> death_fun()). +-spec(length_fun/0 :: () -> length_fun()). + +-endif. + %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator @@ -73,7 +97,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), - ok = gm:broadcast(GM, {length, BQ:length(BQS)}), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -343,13 +367,18 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, State end. +%% --------------------------------------------------------------------------- +%% Other exported functions +%% --------------------------------------------------------------------------- + promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - ok = gm:broadcast(GM, {length, BQ:length(BQS)}), + Len = BQ:len(BQS), + ok = gm:broadcast(GM, {length, Len}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), + set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -375,7 +404,7 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:length(BQS)}), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), State end) end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f5492cdc..3ee71a6d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2087,8 +2087,11 @@ variable_queue_init(Q, Recover) -> Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> + variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> lists:foldl( - fun (_N, VQN) -> + fun (N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), @@ -2096,7 +2099,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, self(), VQN) + PropFun(N, #message_properties{}), self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2136,6 +2139,7 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, + fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1]], passed. @@ -2172,14 +2176,9 @@ test_dropwhile(VQ0) -> Count = 10, %% add messages with sequential expiry - VQ1 = lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, self(), VQN) - end, VQ0, lists:seq(1, Count)), + VQ1 = variable_queue_publish( + false, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( @@ -2199,6 +2198,14 @@ test_dropwhile(VQ0) -> VQ4. +test_dropwhile_varying_ram_duration(VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), + VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ5 = variable_queue_publish(false, 1, VQ4), + rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a167cca0..c6d99deb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,8 +18,9 @@ -export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + dropwhile/2, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/3, discard/3, @@ -560,114 +561,29 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), - a(State1). - -dropwhile1(Pred, State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, State2); - false -> {ok, in_r(MsgStatus, State1)} - end - end, State). - -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - true = queue:is_empty(Q4), %% ASSERTION - State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + case queue_out(State) of + {empty, State1} -> + a(State1); + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> a(in_r(MsgStatus, State1)) + end + end. fetch(AckRequired, State) -> - internal_queue_out( - fun(MsgStatus, State1) -> - %% it's possible that the message wasn't read from disk - %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) - end, State). - -internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) - end; - {{value, MsgStatus}, Q4a} -> - Fun(MsgStatus, State #vqstate { q4 = Q4a }) + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + %% it is possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), + {Res, a(State3)} end. -read_msg(MsgStatus = #msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> - {MsgStatus, State}. - -internal_fetch(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount }) -> - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) - end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - - {{Msg, IsDelivered, AckTag, Len1}, - a(State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1 })}. - ack(AckTags, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (_, State0) -> State0 end, @@ -1141,6 +1057,95 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. +in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> + case queue:is_empty(Q4) of + true -> State #vqstate { + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + end; +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + +queue_out(State = #vqstate { q4 = Q4 }) -> + case queue:out(Q4) of + {empty, _Q4} -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} + end; + {{value, MsgStatus}, Q4a} -> + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end. + +read_msg(MsgStatus = #msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 }}. + msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> case SyncCallback(?MODULE, |