summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-26 20:42:42 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-26 20:42:42 +0100
commit6728492cb87c0193802522048b4365a8b24f4953 (patch)
treed1191e43cfb3a47bc320c9113db22120de5bee67
parent1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5 (diff)
parent0835a8e307cc73a2bd9e789cb6d55d1c58b734ed (diff)
downloadrabbitmq-server-6728492cb87c0193802522048b4365a8b24f4953.tar.gz
Merge default into bug24130. Also add appropriate specs, and fix some bugs
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl37
-rw-r--r--src/rabbit_tests.erl27
-rw-r--r--src/rabbit_variable_queue.erl217
5 files changed, 173 insertions, 121 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c1fa048d..e388ccf2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -220,7 +220,7 @@ terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
case BQS of
- undefined -> State;
+ undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
BQS1 = lists:foldl(
fun (#cr{txn = none}, BQSN) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index b70761ea..550423d2 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -38,6 +38,17 @@
-define(ONE_SECOND, 1000).
+-ifdef(use_specs).
+
+-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:length_fun()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(get_gm/1 :: (pid()) -> pid()).
+-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
+
+-endif.
+
%%----------------------------------------------------------------------------
%%
%% Mirror Queues
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7d2b7e44..9578026e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -43,6 +43,30 @@
known_senders
}).
+-ifdef(use_specs).
+
+-export_type([death_fun/0, length_fun/0]).
+
+-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(length_fun() :: fun (() -> 'ok')).
+-type(master_state() :: #state { gm :: pid(),
+ coordinator :: pid(),
+ backing_queue :: atom(),
+ backing_queue_state :: any(),
+ set_delivered :: non_neg_integer(),
+ seen_status :: dict(),
+ confirmed :: [rabbit_guid:guid()],
+ ack_msg_id :: dict(),
+ known_senders :: set()
+ }).
+
+-spec(promote_backing_queue_state/6 ::
+ (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
+-spec(sender_death_fun/0 :: () -> death_fun()).
+-spec(length_fun/0 :: () -> length_fun()).
+
+-endif.
+
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
@@ -73,7 +97,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
- ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -343,13 +367,18 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
State
end.
+%% ---------------------------------------------------------------------------
+%% Other exported functions
+%% ---------------------------------------------------------------------------
+
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
+ Len = BQ:len(BQS),
+ ok = gm:broadcast(GM, {length, Len}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
+ set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -375,7 +404,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
State
end)
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f5492cdc..3ee71a6d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2087,8 +2087,11 @@ variable_queue_init(Q, Recover) ->
Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
+ variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
lists:foldl(
- fun (_N, VQN) ->
+ fun (N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2096,7 +2099,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, self(), VQN)
+ PropFun(N, #message_properties{}), self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2136,6 +2139,7 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_dropwhile/1,
+ fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1]],
passed.
@@ -2172,14 +2176,9 @@ test_dropwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
- VQ1 = lists:foldl(
- fun (N, VQN) ->
- rabbit_variable_queue:publish(
- rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, self(), VQN)
- end, VQ0, lists:seq(1, Count)),
+ VQ1 = variable_queue_publish(
+ false, Count,
+ fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
VQ2 = rabbit_variable_queue:dropwhile(
@@ -2199,6 +2198,14 @@ test_dropwhile(VQ0) ->
VQ4.
+test_dropwhile_varying_ram_duration(VQ0) ->
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
+ VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2),
+ VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ5 = variable_queue_publish(false, 1, VQ4),
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5).
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a167cca0..c6d99deb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,8 +18,9 @@
-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ dropwhile/2, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/3, discard/3,
@@ -560,114 +561,29 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
{gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
dropwhile(Pred, State) ->
- {_OkOrEmpty, State1} = dropwhile1(Pred, State),
- a(State1).
-
-dropwhile1(Pred, State) ->
- internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
- case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus,
- State1),
- dropwhile1(Pred, State2);
- false -> {ok, in_r(MsgStatus, State1)}
- end
- end, State).
-
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- true = queue:is_empty(Q4), %% ASSERTION
- State #vqstate {
- q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
-in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+ case queue_out(State) of
+ {empty, State1} ->
+ a(State1);
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> a(in_r(MsgStatus, State1))
+ end
+ end.
fetch(AckRequired, State) ->
- internal_queue_out(
- fun(MsgStatus, State1) ->
- %% it's possible that the message wasn't read from disk
- %% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- internal_fetch(AckRequired, MsgStatus1, State2)
- end, State).
-
-internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
- case queue:out(Q4) of
- {empty, _Q4} ->
- case fetch_from_q3(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1)
- end;
- {{value, MsgStatus}, Q4a} ->
- Fun(MsgStatus, State #vqstate { q4 = Q4a })
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ %% it is possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
+ {Res, a(State3)}
end.
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
- %% 1. Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% 2. Remove from msg_store and queue index, if necessary
- Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
- end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
- end,
-
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
- RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
-
- {{Msg, IsDelivered, AckTag, Len1},
- a(State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 })}.
-
ack(AckTags, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
fun (_, State0) -> State0 end,
@@ -1141,6 +1057,95 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
+in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
+ State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+ case queue:is_empty(Q4) of
+ true -> State #vqstate {
+ q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+ false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) }
+ end;
+in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
+ State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+
+queue_out(State = #vqstate { q4 = Q4 }) ->
+ case queue:out(Q4) of
+ {empty, _Q4} ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end;
+ {{value, MsgStatus}, Q4a} ->
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end.
+
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
+
+internal_fetch(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ Rem = fun () ->
+ ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
+ end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+
+ {{Msg, IsDelivered, AckTag, Len1},
+ State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 }}.
+
msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
AsyncCallback, SyncCallback) ->
case SyncCallback(?MODULE,