diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 20:23:39 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 20:23:39 +0100 |
commit | 0835a8e307cc73a2bd9e789cb6d55d1c58b734ed (patch) | |
tree | f63af9b86429ed6a3ebd27a248fed18a53254039 | |
parent | 14f3091f8ff207620a12595ca003edbf8b85497e (diff) | |
download | rabbitmq-server-0835a8e307cc73a2bd9e789cb6d55d1c58b734ed.tar.gz |
Add missing specs; move some functions to more appropriate places
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 76 |
2 files changed, 63 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2727c1d0..57f6ca8b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -37,6 +37,16 @@ -define(ONE_SECOND, 1000). +-ifdef(use_specs). + +-spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun()) -> + rabbit_types:ok_pid_or_error()). +-spec(get_gm/1 :: (pid()) -> pid()). +-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). + +-endif. + %%---------------------------------------------------------------------------- %% %% Mirror Queues diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 463b8cfb..b090ebe8 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -43,6 +43,28 @@ known_senders }). +-ifdef(use_specs). + +-export_type([death_fun/0]). + +-type(death_fun() :: fun ((pid()) -> 'ok')). +-type(master_state() :: #state { gm :: pid(), + coordinator :: pid(), + backing_queue :: atom(), + backing_queue_state :: any(), + set_delivered :: non_neg_integer(), + seen_status :: dict(), + confirmed :: [rabbit_guid:guid()], + ack_msg_id :: dict(), + known_senders :: set() + }). + +-spec(promote_backing_queue_state/6 :: + (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(sender_death_fun/0 :: () -> death_fun()). + +-endif. + %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator @@ -59,18 +81,6 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -sender_death_fun() -> - Self = self(), - fun (DeadPid) -> - rabbit_amqqueue:run_backing_queue_async( - Self, ?MODULE, - fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> - ok = gm:broadcast(GM, {sender_death, DeadPid}), - KS1 = sets:del_element(DeadPid, KS), - State #state { known_senders = KS1 } - end) - end. - init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback, SyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( @@ -95,17 +105,6 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - #state { gm = GM, - coordinator = CPid, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = BQ:len(BQS), - seen_status = SeenStatus, - confirmed = [], - ack_msg_id = dict:new(), - known_senders = sets:from_list(KS) }. - terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly @@ -365,6 +364,37 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, State end. +%% --------------------------------------------------------------------------- +%% Other exported functions +%% --------------------------------------------------------------------------- + +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus, + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:from_list(KS) }. + +sender_death_fun() -> + Self = self(), + fun (DeadPid) -> + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end) + end. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> |