summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-26 20:23:39 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-26 20:23:39 +0100
commit0835a8e307cc73a2bd9e789cb6d55d1c58b734ed (patch)
treef63af9b86429ed6a3ebd27a248fed18a53254039
parent14f3091f8ff207620a12595ca003edbf8b85497e (diff)
downloadrabbitmq-server-0835a8e307cc73a2bd9e789cb6d55d1c58b734ed.tar.gz
Add missing specs; move some functions to more appropriate places
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl76
2 files changed, 63 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2727c1d0..57f6ca8b 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -37,6 +37,16 @@
-define(ONE_SECOND, 1000).
+-ifdef(use_specs).
+
+-spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(get_gm/1 :: (pid()) -> pid()).
+-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
+
+-endif.
+
%%----------------------------------------------------------------------------
%%
%% Mirror Queues
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 463b8cfb..b090ebe8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -43,6 +43,28 @@
known_senders
}).
+-ifdef(use_specs).
+
+-export_type([death_fun/0]).
+
+-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(master_state() :: #state { gm :: pid(),
+ coordinator :: pid(),
+ backing_queue :: atom(),
+ backing_queue_state :: any(),
+ set_delivered :: non_neg_integer(),
+ seen_status :: dict(),
+ confirmed :: [rabbit_guid:guid()],
+ ack_msg_id :: dict(),
+ known_senders :: set()
+ }).
+
+-spec(promote_backing_queue_state/6 ::
+ (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
+-spec(sender_death_fun/0 :: () -> death_fun()).
+
+-endif.
+
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
@@ -59,18 +81,6 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-sender_death_fun() ->
- Self = self(),
- fun (DeadPid) ->
- rabbit_amqqueue:run_backing_queue_async(
- Self, ?MODULE,
- fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
- ok = gm:broadcast(GM, {sender_death, DeadPid}),
- KS1 = sets:del_element(DeadPid, KS),
- State #state { known_senders = KS1 }
- end)
- end.
-
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback, SyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
@@ -95,17 +105,6 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
ack_msg_id = dict:new(),
known_senders = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- #state { gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
- seen_status = SeenStatus,
- confirmed = [],
- ack_msg_id = dict:new(),
- known_senders = sets:from_list(KS) }.
-
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination - this node has been explicitly
@@ -365,6 +364,37 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
State
end.
+%% ---------------------------------------------------------------------------
+%% Other exported functions
+%% ---------------------------------------------------------------------------
+
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen_status = SeenStatus,
+ confirmed = [],
+ ack_msg_id = dict:new(),
+ known_senders = sets:from_list(KS) }.
+
+sender_death_fun() ->
+ Self = self(),
+ fun (DeadPid) ->
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
+ ok = gm:broadcast(GM, {sender_death, DeadPid}),
+ KS1 = sets:del_element(DeadPid, KS),
+ State #state { known_senders = KS1 }
+ end)
+ end.
+
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
maybe_store_acktag(AckTag, MsgId, AM) ->