diff options
Diffstat (limited to 'src/gm.erl')
-rw-r--r-- | src/gm.erl | 1493 |
1 files changed, 0 insertions, 1493 deletions
diff --git a/src/gm.erl b/src/gm.erl deleted file mode 100644 index 636e63e4..00000000 --- a/src/gm.erl +++ /dev/null @@ -1,1493 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. -%% - --module(gm). - -%% Guaranteed Multicast -%% ==================== -%% -%% This module provides the ability to create named groups of -%% processes to which members can be dynamically added and removed, -%% and for messages to be broadcast within the group that are -%% guaranteed to reach all members of the group during the lifetime of -%% the message. The lifetime of a message is defined as being, at a -%% minimum, the time from which the message is first sent to any -%% member of the group, up until the time at which it is known by the -%% member who published the message that the message has reached all -%% group members. -%% -%% The guarantee given is that provided a message, once sent, makes it -%% to members who do not all leave the group, the message will -%% continue to propagate to all group members. -%% -%% Another way of stating the guarantee is that if member P publishes -%% messages m and m', then for all members P', if P' is a member of -%% the group prior to the publication of m, and P' receives m', then -%% P' will receive m. -%% -%% Note that only local-ordering is enforced: i.e. if member P sends -%% message m and then message m', then for-all members P', if P' -%% receives m and m', then they will receive m' after m. Causality -%% ordering is _not_ enforced. I.e. if member P receives message m -%% and as a result publishes message m', there is no guarantee that -%% other members P' will receive m before m'. -%% -%% -%% API Use -%% ------- -%% -%% Mnesia must be started. Use the idempotent create_tables/0 function -%% to create the tables required. -%% -%% start_link/3 -%% Provide the group name, the callback module name, and any arguments -%% you wish to be passed into the callback module's functions. The -%% joined/2 function will be called when we have joined the group, -%% with the arguments passed to start_link and a list of the current -%% members of the group. See the callbacks specs and the comments -%% below for further details of the callback functions. -%% -%% leave/1 -%% Provide the Pid. Removes the Pid from the group. The callback -%% handle_terminate/2 function will be called. -%% -%% broadcast/2 -%% Provide the Pid and a Message. The message will be sent to all -%% members of the group as per the guarantees given above. This is a -%% cast and the function call will return immediately. There is no -%% guarantee that the message will reach any member of the group. -%% -%% confirmed_broadcast/2 -%% Provide the Pid and a Message. As per broadcast/2 except that this -%% is a call, not a cast, and only returns 'ok' once the Message has -%% reached every member of the group. Do not call -%% confirmed_broadcast/2 directly from the callback module otherwise -%% you will deadlock the entire group. -%% -%% info/1 -%% Provide the Pid. Returns a proplist with various facts, including -%% the group name and the current group members. -%% -%% validate_members/2 -%% Check whether a given member list agrees with the chosen member's -%% view. Any differences will be communicated via the members_changed -%% callback. If there are no differences then there will be no reply. -%% Note that members will not necessarily share the same view. -%% -%% forget_group/1 -%% Provide the group name. Removes its mnesia record. Makes no attempt -%% to ensure the group is empty. -%% -%% Implementation Overview -%% ----------------------- -%% -%% One possible means of implementation would be a fan-out from the -%% sender to every member of the group. This would require that the -%% group is fully connected, and, in the event that the original -%% sender of the message disappears from the group before the message -%% has made it to every member of the group, raises questions as to -%% who is responsible for sending on the message to new group members. -%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] - -%% if the sender dies part way through, who is responsible for -%% ensuring that the remaining Members receive the Msg? In the event -%% that within the group, messages sent are broadcast from a subset of -%% the members, the fan-out arrangement has the potential to -%% substantially impact the CPU and network workload of such members, -%% as such members would have to accommodate the cost of sending each -%% message to every group member. -%% -%% Instead, if the members of the group are arranged in a chain, then -%% it becomes easier to reason about who within the group has received -%% each message and who has not. It eases issues of responsibility: in -%% the event of a group member disappearing, the nearest upstream -%% member of the chain is responsible for ensuring that messages -%% continue to propagate down the chain. It also results in equal -%% distribution of sending and receiving workload, even if all -%% messages are being sent from just a single group member. This -%% configuration has the further advantage that it is not necessary -%% for every group member to know of every other group member, and -%% even that a group member does not have to be accessible from all -%% other group members. -%% -%% Performance is kept high by permitting pipelining and all -%% communication between joined group members is asynchronous. In the -%% chain A -> B -> C -> D, if A sends a message to the group, it will -%% not directly contact C or D. However, it must know that D receives -%% the message (in addition to B and C) before it can consider the -%% message fully sent. A simplistic implementation would require that -%% D replies to C, C replies to B and B then replies to A. This would -%% result in a propagation delay of twice the length of the chain. It -%% would also require, in the event of the failure of C, that D knows -%% to directly contact B and issue the necessary replies. Instead, the -%% chain forms a ring: D sends the message on to A: D does not -%% distinguish A as the sender, merely as the next member (downstream) -%% within the chain (which has now become a ring). When A receives -%% from D messages that A sent, it knows that all members have -%% received the message. However, the message is not dead yet: if C -%% died as B was sending to C, then B would need to detect the death -%% of C and forward the message on to D instead: thus every node has -%% to remember every message published until it is told that it can -%% forget about the message. This is essential not just for dealing -%% with failure of members, but also for the addition of new members. -%% -%% Thus once A receives the message back again, it then sends to B an -%% acknowledgement for the message, indicating that B can now forget -%% about the message. B does so, and forwards the ack to C. C forgets -%% the message, and forwards the ack to D, which forgets the message -%% and finally forwards the ack back to A. At this point, A takes no -%% further action: the message and its acknowledgement have made it to -%% every member of the group. The message is now dead, and any new -%% member joining the group at this point will not receive the -%% message. -%% -%% We therefore have two roles: -%% -%% 1. The sender, who upon receiving their own messages back, must -%% then send out acknowledgements, and upon receiving their own -%% acknowledgements back perform no further action. -%% -%% 2. The other group members who upon receiving messages and -%% acknowledgements must update their own internal state accordingly -%% (the sending member must also do this in order to be able to -%% accommodate failures), and forwards messages on to their downstream -%% neighbours. -%% -%% -%% Implementation: It gets trickier -%% -------------------------------- -%% -%% Chain A -> B -> C -> D -%% -%% A publishes a message which B receives. A now dies. B and D will -%% detect the death of A, and will link up, thus the chain is now B -> -%% C -> D. B forwards A's message on to C, who forwards it to D, who -%% forwards it to B. Thus B is now responsible for A's messages - both -%% publications and acknowledgements that were in flight at the point -%% at which A died. Even worse is that this is transitive: after B -%% forwards A's message to C, B dies as well. Now C is not only -%% responsible for B's in-flight messages, but is also responsible for -%% A's in-flight messages. -%% -%% Lemma 1: A member can only determine which dead members they have -%% inherited responsibility for if there is a total ordering on the -%% conflicting additions and subtractions of members from the group. -%% -%% Consider the simultaneous death of B and addition of B' that -%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or -%% C is responsible for in-flight messages from B. It is easy to -%% ensure that at least one of them thinks they have inherited B, but -%% if we do not ensure that exactly one of them inherits B, then we -%% could have B' converting publishes to acks, which then will crash C -%% as C does not believe it has issued acks for those messages. -%% -%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E -%% becoming A -> C' -> E. Who has inherited which of B, C and D? -%% -%% However, for non-conflicting membership changes, only a partial -%% ordering is required. For example, A -> B -> C becoming A -> A' -> -%% B. The addition of A', between A and B can have no conflicts with -%% the death of C: it is clear that A has inherited C's messages. -%% -%% For ease of implementation, we adopt the simple solution, of -%% imposing a total order on all membership changes. -%% -%% On the death of a member, it is ensured the dead member's -%% neighbours become aware of the death, and the upstream neighbour -%% now sends to its new downstream neighbour its state, including the -%% messages pending acknowledgement. The downstream neighbour can then -%% use this to calculate which publishes and acknowledgements it has -%% missed out on, due to the death of its old upstream. Thus the -%% downstream can catch up, and continues the propagation of messages -%% through the group. -%% -%% Lemma 2: When a member is joining, it must synchronously -%% communicate with its upstream member in order to receive its -%% starting state atomically with its addition to the group. -%% -%% New members must start with the same state as their nearest -%% upstream neighbour. This ensures that it is not surprised by -%% acknowledgements they are sent, and that should their downstream -%% neighbour die, they are able to send the correct state to their new -%% downstream neighbour to ensure it can catch up. Thus in the -%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' -> -%% C, A' must start with the state of A, so that it can send C the -%% correct state when B dies, allowing C to detect any missed -%% messages. -%% -%% If A' starts by adding itself to the group membership, A could then -%% die, without A' having received the necessary state from A. This -%% would leave A' responsible for in-flight messages from A, but -%% having the least knowledge of all, of those messages. Thus A' must -%% start by synchronously calling A, which then immediately sends A' -%% back its state. A then adds A' to the group. If A dies at this -%% point then A' will be able to see this (as A' will fail to appear -%% in the group membership), and thus A' will ignore the state it -%% receives from A, and will simply repeat the process, trying to now -%% join downstream from some other member. This ensures that should -%% the upstream die as soon as the new member has been joined, the new -%% member is guaranteed to receive the correct state, allowing it to -%% correctly process messages inherited due to the death of its -%% upstream neighbour. -%% -%% The canonical definition of the group membership is held by a -%% distributed database. Whilst this allows the total ordering of -%% changes to be achieved, it is nevertheless undesirable to have to -%% query this database for the current view, upon receiving each -%% message. Instead, we wish for members to be able to cache a view of -%% the group membership, which then requires a cache invalidation -%% mechanism. Each member maintains its own view of the group -%% membership. Thus when the group's membership changes, members may -%% need to become aware of such changes in order to be able to -%% accurately process messages they receive. Because of the -%% requirement of a total ordering of conflicting membership changes, -%% it is not possible to use the guaranteed broadcast mechanism to -%% communicate these changes: to achieve the necessary ordering, it -%% would be necessary for such messages to be published by exactly one -%% member, which can not be guaranteed given that such a member could -%% die. -%% -%% The total ordering we enforce on membership changes gives rise to a -%% view version number: every change to the membership creates a -%% different view, and the total ordering permits a simple -%% monotonically increasing view version number. -%% -%% Lemma 3: If a message is sent from a member that holds view version -%% N, it can be correctly processed by any member receiving the -%% message with a view version >= N. -%% -%% Initially, let us suppose that each view contains the ordering of -%% every member that was ever part of the group. Dead members are -%% marked as such. Thus we have a ring of members, some of which are -%% dead, and are thus inherited by the nearest alive downstream -%% member. -%% -%% In the chain A -> B -> C, all three members initially have view -%% version 1, which reflects reality. B publishes a message, which is -%% forward by C to A. B now dies, which A notices very quickly. Thus A -%% updates the view, creating version 2. It now forwards B's -%% publication, sending that message to its new downstream neighbour, -%% C. This happens before C is aware of the death of B. C must become -%% aware of the view change before it interprets the message its -%% received, otherwise it will fail to learn of the death of B, and -%% thus will not realise it has inherited B's messages (and will -%% likely crash). -%% -%% Thus very simply, we have that each subsequent view contains more -%% information than the preceding view. -%% -%% However, to avoid the views growing indefinitely, we need to be -%% able to delete members which have died _and_ for which no messages -%% are in-flight. This requires that upon inheriting a dead member, we -%% know the last publication sent by the dead member (this is easy: we -%% inherit a member because we are the nearest downstream member which -%% implies that we know at least as much than everyone else about the -%% publications of the dead member), and we know the earliest message -%% for which the acknowledgement is still in flight. -%% -%% In the chain A -> B -> C, when B dies, A will send to C its state -%% (as C is the new downstream from A), allowing C to calculate which -%% messages it has missed out on (described above). At this point, C -%% also inherits B's messages. If that state from A also includes the -%% last message published by B for which an acknowledgement has been -%% seen, then C knows exactly which further acknowledgements it must -%% receive (also including issuing acknowledgements for publications -%% still in-flight that it receives), after which it is known there -%% are no more messages in flight for B, thus all evidence that B was -%% ever part of the group can be safely removed from the canonical -%% group membership. -%% -%% Thus, for every message that a member sends, it includes with that -%% message its view version. When a member receives a message it will -%% update its view from the canonical copy, should its view be older -%% than the view version included in the message it has received. -%% -%% The state held by each member therefore includes the messages from -%% each publisher pending acknowledgement, the last publication seen -%% from that publisher, and the last acknowledgement from that -%% publisher. In the case of the member's own publications or -%% inherited members, this last acknowledgement seen state indicates -%% the last acknowledgement retired, rather than sent. -%% -%% -%% Proof sketch -%% ------------ -%% -%% We need to prove that with the provided operational semantics, we -%% can never reach a state that is not well formed from a well-formed -%% starting state. -%% -%% Operational semantics (small step): straight-forward message -%% sending, process monitoring, state updates. -%% -%% Well formed state: dead members inherited by exactly one non-dead -%% member; for every entry in anyone's pending-acks, either (the -%% publication of the message is in-flight downstream from the member -%% and upstream from the publisher) or (the acknowledgement of the -%% message is in-flight downstream from the publisher and upstream -%% from the member). -%% -%% Proof by induction on the applicable operational semantics. -%% -%% -%% Related work -%% ------------ -%% -%% The ring configuration and double traversal of messages around the -%% ring is similar (though developed independently) to the LCR -%% protocol by [Levy 2008]. However, LCR differs in several -%% ways. Firstly, by using vector clocks, it enforces a total order of -%% message delivery, which is unnecessary for our purposes. More -%% significantly, it is built on top of a "group communication system" -%% which performs the group management functions, taking -%% responsibility away from the protocol as to how to cope with safely -%% adding and removing members. When membership changes do occur, the -%% protocol stipulates that every member must perform communication -%% with every other member of the group, to ensure all outstanding -%% deliveries complete, before the entire group transitions to the new -%% view. This, in total, requires two sets of all-to-all synchronous -%% communications. -%% -%% This is not only rather inefficient, but also does not explain what -%% happens upon the failure of a member during this process. It does -%% though entirely avoid the need for inheritance of responsibility of -%% dead members that our protocol incorporates. -%% -%% In [Marandi et al 2010], a Paxos-based protocol is described. This -%% work explicitly focuses on the efficiency of communication. LCR -%% (and our protocol too) are more efficient, but at the cost of -%% higher latency. The Ring-Paxos protocol is itself built on top of -%% IP-multicast, which rules it out for many applications where -%% point-to-point communication is all that can be required. They also -%% have an excellent related work section which I really ought to -%% read... -%% -%% -%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008. -%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast -%% Protocol - - --behaviour(gen_server2). - --export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, - confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/3]). - -%% For INSTR_MOD callbacks --export([call/3, cast/2, monitor/1, demonitor/1]). - --ifndef(use_specs). --export([behaviour_info/1]). --endif. - --export([table_definitions/0]). - --define(GROUP_TABLE, gm_group). --define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). --define(BROADCAST_TIMER, 25). --define(VERSION_START, 0). --define(SETS, ordsets). --define(DICT, orddict). - --record(state, - { self, - left, - right, - group_name, - module, - view, - pub_count, - members_state, - callback_args, - confirms, - broadcast_buffer, - broadcast_buffer_sz, - broadcast_timer, - txn_executor - }). - --record(gm_group, { name, version, members }). - --record(view_member, { id, aliases, left, right }). - --record(member, { pending_ack, last_pub, last_ack }). - --define(TABLE, {?GROUP_TABLE, [{record_name, gm_group}, - {attributes, record_info(fields, gm_group)}]}). --define(TABLE_MATCH, {match, #gm_group { _ = '_' }}). - --define(TAG, '$gm'). - --ifdef(use_specs). - --export_type([group_name/0]). - --type(group_name() :: any()). --type(txn_fun() :: fun((fun(() -> any())) -> any())). - --spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). --spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) -> - rabbit_types:ok_pid_or_error()). --spec(leave/1 :: (pid()) -> 'ok'). --spec(broadcast/2 :: (pid(), any()) -> 'ok'). --spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). --spec(info/1 :: (pid()) -> rabbit_types:infos()). --spec(validate_members/2 :: (pid(), [pid()]) -> 'ok'). --spec(forget_group/1 :: (group_name()) -> 'ok'). - -%% The joined, members_changed and handle_msg callbacks can all return -%% any of the following terms: -%% -%% 'ok' - the callback function returns normally -%% -%% {'stop', Reason} - the callback indicates the member should stop -%% with reason Reason and should leave the group. -%% -%% {'become', Module, Args} - the callback indicates that the callback -%% module should be changed to Module and that the callback functions -%% should now be passed the arguments Args. This allows the callback -%% module to be dynamically changed. - -%% Called when we've successfully joined the group. Supplied with Args -%% provided in start_link, plus current group members. --callback joined(Args :: term(), Members :: [pid()]) -> - ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. - -%% Supplied with Args provided in start_link, the list of new members -%% and the list of members previously known to us that have since -%% died. Note that if a member joins and dies very quickly, it's -%% possible that we will never see that member appear in either births -%% or deaths. However we are guaranteed that (1) we will see a member -%% joining either in the births here, or in the members passed to -%% joined/2 before receiving any messages from it; and (2) we will not -%% see members die that we have not seen born (or supplied in the -%% members to joined/2). --callback members_changed(Args :: term(), - Births :: [pid()], Deaths :: [pid()]) -> - ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. - -%% Supplied with Args provided in start_link, the sender, and the -%% message. This does get called for messages injected by this member, -%% however, in such cases, there is no special significance of this -%% invocation: it does not indicate that the message has made it to -%% any other members, let alone all other members. --callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> - ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. - -%% Called on gm member termination as per rules in gen_server, with -%% the Args provided in start_link plus the termination Reason. --callback handle_terminate(Args :: term(), Reason :: term()) -> - ok | term(). - --else. - -behaviour_info(callbacks) -> - [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {handle_terminate, 2}]; -behaviour_info(_Other) -> - undefined. - --endif. - -create_tables() -> - create_tables([?TABLE]). - -create_tables([]) -> - ok; -create_tables([{Table, Attributes} | Tables]) -> - case mnesia:create_table(Table, Attributes) of - {atomic, ok} -> create_tables(Tables); - {aborted, {already_exists, Table}} -> create_tables(Tables); - Err -> Err - end. - -table_definitions() -> - {Name, Attributes} = ?TABLE, - [{Name, [?TABLE_MATCH | Attributes]}]. - -start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). - -leave(Server) -> - gen_server2:cast(Server, leave). - -broadcast(Server, Msg) -> broadcast(Server, Msg, 0). - -broadcast(Server, Msg, SizeHint) -> - gen_server2:cast(Server, {broadcast, Msg, SizeHint}). - -confirmed_broadcast(Server, Msg) -> - gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). - -info(Server) -> - gen_server2:call(Server, info, infinity). - -validate_members(Server, Members) -> - gen_server2:cast(Server, {validate_members, Members}). - -forget_group(GroupName) -> - {atomic, ok} = mnesia:sync_transaction( - fun () -> - mnesia:delete({?GROUP_TABLE, GroupName}) - end), - ok. - -init([GroupName, Module, Args, TxnFun]) -> - put(process_name, {?MODULE, GroupName}), - {MegaSecs, Secs, MicroSecs} = now(), - random:seed(MegaSecs, Secs, MicroSecs), - Self = make_member(GroupName), - gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_buffer_sz = 0, - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - - -handle_call({confirmed_broadcast, _Msg}, _From, - State = #state { members_state = undefined }) -> - reply(not_joined, State); - -handle_call({confirmed_broadcast, Msg}, _From, - State = #state { self = Self, - right = {Self, undefined}, - module = Module, - callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), - ok, State}); - -handle_call({confirmed_broadcast, Msg}, From, State) -> - {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} = - internal_broadcast(Msg, 0, State), - Confirms1 = queue:in({PubCount, From}, Confirms), - handle_callback_result({Result, flush_broadcast_buffer( - State1 #state { confirms = Confirms1 })}); - -handle_call(info, _From, - State = #state { members_state = undefined }) -> - reply(not_joined, State); - -handle_call(info, _From, State = #state { group_name = GroupName, - module = Module, - view = View }) -> - reply([{group_name, GroupName}, - {module, Module}, - {group_members, get_pids(alive_view_members(View))}], State); - -handle_call({add_on_right, _NewMember}, _From, - State = #state { members_state = undefined }) -> - reply(not_ready, State); - -handle_call({add_on_right, NewMember}, _From, - State = #state { self = Self, - group_name = GroupName, - members_state = MembersState, - txn_executor = TxnFun }) -> - Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), - View1 = group_to_view(Group), - MembersState1 = remove_erased_members(MembersState, View1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state(MembersState1)}), - {Result, State1} = change_view(View1, State #state { - members_state = MembersState1 }), - handle_callback_result({Result, {ok, Group}, State1}). - -%% add_on_right causes a catchup to be sent immediately from the left, -%% so we can never see this from the left neighbour. However, it's -%% possible for the right neighbour to send us a check_neighbours -%% immediately before that. We can't possibly handle it, but if we're -%% in this state we know a catchup is coming imminently anyway. So -%% just ignore it. -handle_cast({?TAG, _ReqVer, check_neighbours}, - State = #state { members_state = undefined }) -> - noreply(State); - -handle_cast({?TAG, ReqVer, Msg}, - State = #state { view = View, - members_state = MembersState, - group_name = GroupName }) -> - {Result, State1} = - case needs_view_update(ReqVer, View) of - true -> View1 = group_to_view(dirty_read_group(GroupName)), - MemberState1 = remove_erased_members(MembersState, View1), - change_view(View1, State #state { - members_state = MemberState1 }); - false -> {ok, State} - end, - handle_callback_result( - if_callback_success( - Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); - -handle_cast({broadcast, _Msg, _SizeHint}, - State = #state { members_state = undefined }) -> - noreply(State); - -handle_cast({broadcast, Msg, _SizeHint}, - State = #state { self = Self, - right = {Self, undefined}, - module = Module, - callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), - State}); - -handle_cast({broadcast, Msg, SizeHint}, State) -> - {Result, State1} = internal_broadcast(Msg, SizeHint, State), - handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)}); - -handle_cast(join, State = #state { self = Self, - group_name = GroupName, - members_state = undefined, - module = Module, - callback_args = Args, - txn_executor = TxnFun }) -> - View = join_group(Self, GroupName, TxnFun), - MembersState = - case alive_view_members(View) of - [Self] -> blank_member_state(); - _ -> undefined - end, - State1 = check_neighbours(State #state { view = View, - members_state = MembersState }), - handle_callback_result( - {Module:joined(Args, get_pids(all_known_members(View))), State1}); - -handle_cast({validate_members, OldMembers}, - State = #state { view = View, - module = Module, - callback_args = Args }) -> - NewMembers = get_pids(all_known_members(View)), - Births = NewMembers -- OldMembers, - Deaths = OldMembers -- NewMembers, - case {Births, Deaths} of - {[], []} -> noreply(State); - _ -> Result = Module:members_changed(Args, Births, Deaths), - handle_callback_result({Result, State}) - end; - -handle_cast(leave, State) -> - {stop, normal, State}. - - -handle_info(flush, State) -> - noreply( - flush_broadcast_buffer(State #state { broadcast_timer = undefined })); - -handle_info(timeout, State) -> - noreply(flush_broadcast_buffer(State)); - -handle_info({'DOWN', MRef, process, _Pid, Reason}, - State = #state { self = Self, - left = Left, - right = Right, - group_name = GroupName, - confirms = Confirms, - txn_executor = TxnFun }) -> - Member = case {Left, Right} of - {{Member1, MRef}, _} -> Member1; - {_, {Member1, MRef}} -> Member1; - _ -> undefined - end, - case {Member, Reason} of - {undefined, _} -> - noreply(State); - {_, {shutdown, ring_shutdown}} -> - noreply(State); - _ -> - %% In the event of a partial partition we could see another member - %% go down and then remove them from Mnesia. While they can - %% recover from this they'd have to restart the queue - not - %% ideal. So let's sleep here briefly just in case this was caused - %% by a partial partition; in which case by the time we record the - %% member death in Mnesia we will probably be in a full - %% partition and will not be assassinating another member. - timer:sleep(100), - View1 = group_to_view(record_dead_member_in_group( - Member, GroupName, TxnFun)), - handle_callback_result( - case alive_view_members(View1) of - [Self] -> maybe_erase_aliases( - State #state { - members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }, - View1); - _ -> change_view(View1, State) - end) - end. - - -terminate(Reason, State = #state { module = Module, - callback_args = Args }) -> - flush_broadcast_buffer(State), - Module:handle_terminate(Args, Reason). - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -prioritise_info(flush, _Len, _State) -> - 1; -%% DOWN messages should not overtake initial catchups; if they do we -%% will receive a DOWN we do not know what to do with. -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, - #state { members_state = undefined }) -> - 0; -%% We should not prioritise DOWN messages from our left since -%% otherwise the DOWN can overtake any last activity from the left, -%% causing that activity to be lost. -prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len, - #state { left = {{_LeftVer, LeftPid}, _MRef2} }) -> - 0; -%% But prioritise all other DOWNs - we want to make sure we are not -%% sending activity into the void for too long because our right is -%% down but we don't know it. -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) -> - 1; -prioritise_info(_, _Len, _State) -> - 0. - - -handle_msg(check_neighbours, State) -> - %% no-op - it's already been done by the calling handle_cast - {ok, State}; - -handle_msg({catchup, Left, MembersStateLeft}, - State = #state { self = Self, - left = {Left, _MRefL}, - right = {Right, _MRefR}, - view = View, - members_state = undefined }) -> - ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), - MembersStateLeft1 = build_members_state(MembersStateLeft), - {ok, State #state { members_state = MembersStateLeft1 }}; - -handle_msg({catchup, Left, MembersStateLeft}, - State = #state { self = Self, - left = {Left, _MRefL}, - view = View, - members_state = MembersState }) - when MembersState =/= undefined -> - MembersStateLeft1 = build_members_state(MembersStateLeft), - AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ - ?DICT:fetch_keys(MembersStateLeft1)), - {MembersState1, Activity} = - lists:foldl( - fun (Id, MembersStateActivity) -> - #member { pending_ack = PALeft, last_ack = LA } = - find_member_or_blank(Id, MembersStateLeft1), - with_member_acc( - fun (#member { pending_ack = PA } = Member, Activity1) -> - case is_member_alias(Id, Self, View) of - true -> - {_AcksInFlight, Pubs, _PA1} = - find_prefix_common_suffix(PALeft, PA), - {Member #member { last_ack = LA }, - activity_cons(Id, pubs_from_queue(Pubs), - [], Activity1)}; - false -> - {Acks, _Common, Pubs} = - find_prefix_common_suffix(PA, PALeft), - {Member, - activity_cons(Id, pubs_from_queue(Pubs), - acks_from_queue(Acks), - Activity1)} - end - end, Id, MembersStateActivity) - end, {MembersState, activity_nil()}, AllMembers), - handle_msg({activity, Left, activity_finalise(Activity)}, - State #state { members_state = MembersState1 }); - -handle_msg({catchup, _NotLeft, _MembersState}, State) -> - {ok, State}; - -handle_msg({activity, Left, Activity}, - State = #state { self = Self, - left = {Left, _MRefL}, - view = View, - members_state = MembersState, - confirms = Confirms }) - when MembersState =/= undefined -> - {MembersState1, {Confirms1, Activity1}} = - lists:foldl( - fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> - with_member_acc( - fun (Member = #member { pending_ack = PA, - last_pub = LP, - last_ack = LA }, - {Confirms2, Activity2}) -> - case is_member_alias(Id, Self, View) of - true -> - {ToAck, PA1} = - find_common(queue_from_pubs(Pubs), PA, - queue:new()), - LA1 = last_ack(Acks, LA), - AckNums = acks_from_queue(ToAck), - Confirms3 = maybe_confirm( - Self, Id, Confirms2, AckNums), - {Member #member { pending_ack = PA1, - last_ack = LA1 }, - {Confirms3, - activity_cons( - Id, [], AckNums, Activity2)}}; - false -> - PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), - LA1 = last_ack(Acks, LA), - LP1 = last_pub(Pubs, LP), - {Member #member { pending_ack = PA1, - last_pub = LP1, - last_ack = LA1 }, - {Confirms2, - activity_cons(Id, Pubs, Acks, Activity2)}} - end - end, Id, MembersStateConfirmsActivity) - end, {MembersState, {Confirms, activity_nil()}}, Activity), - State1 = State #state { members_state = MembersState1, - confirms = Confirms1 }, - Activity3 = activity_finalise(Activity1), - ok = maybe_send_activity(Activity3, State1), - {Result, State2} = maybe_erase_aliases(State1, View), - if_callback_success( - Result, fun activity_true/3, fun activity_false/3, Activity3, State2); - -handle_msg({activity, _NotLeft, _Activity}, State) -> - {ok, State}. - - -noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. - -reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. - -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; -flush_timeout(_) -> 0. - -ensure_broadcast_timer(State = #state { broadcast_buffer = [], - broadcast_timer = undefined }) -> - State; -ensure_broadcast_timer(State = #state { broadcast_buffer = [], - broadcast_timer = TRef }) -> - erlang:cancel_timer(TRef), - State #state { broadcast_timer = undefined }; -ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> - TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), - State #state { broadcast_timer = TRef }; -ensure_broadcast_timer(State) -> - State. - -internal_broadcast(Msg, SizeHint, - State = #state { self = Self, - pub_count = PubCount, - module = Module, - callback_args = Args, - broadcast_buffer = Buffer, - broadcast_buffer_sz = BufferSize }) -> - PubCount1 = PubCount + 1, - {Module:handle_msg(Args, get_pid(Self), Msg), - State #state { pub_count = PubCount1, - broadcast_buffer = [{PubCount1, Msg} | Buffer], - broadcast_buffer_sz = BufferSize + SizeHint}}. - -%% The Erlang distribution mechanism has an interesting quirk - it -%% will kill the VM cold with "Absurdly large distribution output data -%% buffer" if you attempt to send a message which serialises out to -%% more than 2^31 bytes in size. It's therefore a very good idea to -%% make sure that we don't exceed that size! -%% -%% Now, we could figure out the size of messages as they come in using -%% size(term_to_binary(Msg)) or similar. The trouble is, that requires -%% us to serialise the message only to throw the serialised form -%% away. Hard to believe that's a sensible thing to do. So instead we -%% accept a size hint from the application, via broadcast/3. This size -%% hint can be the size of anything in the message which we expect -%% could be large, and we just ignore the size of any small bits of -%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat -%% conservatively at 100MB - but the buffer is only to allow us to -%% buffer tiny messages anyway, so 100MB is plenty. - -maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> - case Size > ?MAX_BUFFER_SIZE of - true -> flush_broadcast_buffer(State); - false -> State - end. - -flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> - State; -flush_broadcast_buffer(State = #state { self = Self, - members_state = MembersState, - broadcast_buffer = Buffer, - pub_count = PubCount }) -> - [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount - Pubs = lists:reverse(Buffer), - Activity = activity_cons(Self, Pubs, [], activity_nil()), - ok = maybe_send_activity(activity_finalise(Activity), State), - MembersState1 = with_member( - fun (Member = #member { pending_ack = PA }) -> - PA1 = queue:join(PA, queue:from_list(Pubs)), - Member #member { pending_ack = PA1, - last_pub = PubCount } - end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [], - broadcast_buffer_sz = 0}. - - -%% --------------------------------------------------------------------------- -%% View construction and inspection -%% --------------------------------------------------------------------------- - -needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. - -view_version({Ver, _View}) -> Ver. - -is_member_alive({dead, _Member}) -> false; -is_member_alive(_) -> true. - -is_member_alias(Self, Self, _View) -> - true; -is_member_alias(Member, Self, View) -> - ?SETS:is_element(Member, - ((fetch_view_member(Self, View)) #view_member.aliases)). - -dead_member_id({dead, Member}) -> Member. - -store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> - {Ver, ?DICT:store(Id, VMember, View)}. - -with_view_member(Fun, View, Id) -> - store_view_member(Fun(fetch_view_member(Id, View)), View). - -fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). - -find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). - -blank_view(Ver) -> {Ver, ?DICT:new()}. - -alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). - -all_known_members({_Ver, View}) -> - ?DICT:fold( - fun (Member, #view_member { aliases = Aliases }, Acc) -> - ?SETS:to_list(Aliases) ++ [Member | Acc] - end, [], View). - -group_to_view(#gm_group { members = Members, version = Ver }) -> - Alive = lists:filter(fun is_member_alive/1, Members), - [_|_] = Alive, %% ASSERTION - can't have all dead members - add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members). - -link_view([Left, Middle, Right | Rest], View) -> - case find_view_member(Middle, View) of - error -> - link_view( - [Middle, Right | Rest], - store_view_member(#view_member { id = Middle, - aliases = ?SETS:new(), - left = Left, - right = Right }, View)); - {ok, _} -> - View - end; -link_view(_, View) -> - View. - -add_aliases(View, Members) -> - Members1 = ensure_alive_suffix(Members), - {EmptyDeadSet, View1} = - lists:foldl( - fun (Member, {DeadAcc, ViewAcc}) -> - case is_member_alive(Member) of - true -> - {?SETS:new(), - with_view_member( - fun (VMember = - #view_member { aliases = Aliases }) -> - VMember #view_member { - aliases = ?SETS:union(Aliases, DeadAcc) } - end, ViewAcc, Member)}; - false -> - {?SETS:add_element(dead_member_id(Member), DeadAcc), - ViewAcc} - end - end, {?SETS:new(), View}, Members1), - 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION - View1. - -ensure_alive_suffix(Members) -> - queue:to_list(ensure_alive_suffix1(queue:from_list(Members))). - -ensure_alive_suffix1(MembersQ) -> - {{value, Member}, MembersQ1} = queue:out_r(MembersQ), - case is_member_alive(Member) of - true -> MembersQ; - false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1)) - end. - - -%% --------------------------------------------------------------------------- -%% View modification -%% --------------------------------------------------------------------------- - -join_group(Self, GroupName, TxnFun) -> - join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun). - -join_group(Self, GroupName, {error, not_found}, TxnFun) -> - join_group(Self, GroupName, - prune_or_create_group(Self, GroupName, TxnFun), TxnFun); -join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> - group_to_view(Group); -join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> - case lists:member(Self, Members) of - true -> - group_to_view(Group); - false -> - case lists:filter(fun is_member_alive/1, Members) of - [] -> - join_group(Self, GroupName, - prune_or_create_group(Self, GroupName, TxnFun), - TxnFun); - Alive -> - Left = lists:nth(random:uniform(length(Alive)), Alive), - Handler = - fun () -> - join_group( - Self, GroupName, - record_dead_member_in_group( - Left, GroupName, TxnFun), - TxnFun) - end, - try - case neighbour_call(Left, {add_on_right, Self}) of - {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName, TxnFun) - end - catch - exit:{R, _} - when R =:= noproc; R =:= normal; R =:= shutdown -> - Handler(); - exit:{{R, _}, _} - when R =:= nodedown; R =:= shutdown -> - Handler() - end - end - end. - -dirty_read_group(GroupName) -> - case mnesia:dirty_read(?GROUP_TABLE, GroupName) of - [] -> {error, not_found}; - [Group] -> Group - end. - -read_group(GroupName) -> - case mnesia:read({?GROUP_TABLE, GroupName}) of - [] -> {error, not_found}; - [Group] -> Group - end. - -write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group. - -prune_or_create_group(Self, GroupName, TxnFun) -> - TxnFun( - fun () -> - GroupNew = #gm_group { name = GroupName, - members = [Self], - version = get_version(Self) }, - case read_group(GroupName) of - {error, not_found} -> - write_group(GroupNew); - Group = #gm_group { members = Members } -> - case lists:any(fun is_member_alive/1, Members) of - true -> Group; - false -> write_group(GroupNew) - end - end - end). - -record_dead_member_in_group(Member, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - case lists:splitwith( - fun (Member1) -> Member1 =/= Member end, Members) of - {_Members1, []} -> %% not found - already recorded dead - Group; - {Members1, [Member | Members2]} -> - Members3 = Members1 ++ [{dead, Member} | Members2], - write_group(Group #gm_group { members = Members3, - version = Ver + 1 }) - end - end). - -record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - write_group(Group #gm_group { - members = Prefix ++ [Left, NewMember | Suffix], - version = Ver + 1 }) - end). - -erase_members_in_group(Members, GroupName, TxnFun) -> - DeadMembers = [{dead, Id} || Id <- Members], - TxnFun( - fun () -> - Group = #gm_group { members = [_|_] = Members1, version = Ver } = - read_group(GroupName), - case Members1 -- DeadMembers of - Members1 -> Group; - Members2 -> write_group( - Group #gm_group { members = Members2, - version = Ver + 1 }) - end - end). - -maybe_erase_aliases(State = #state { self = Self, - group_name = GroupName, - members_state = MembersState, - txn_executor = TxnFun }, View) -> - #view_member { aliases = Aliases } = fetch_view_member(Self, View), - {Erasable, MembersState1} - = ?SETS:fold( - fun (Id, {ErasableAcc, MembersStateAcc} = Acc) -> - #member { last_pub = LP, last_ack = LA } = - find_member_or_blank(Id, MembersState), - case can_erase_view_member(Self, Id, LA, LP) of - true -> {[Id | ErasableAcc], - erase_member(Id, MembersStateAcc)}; - false -> Acc - end - end, {[], MembersState}, Aliases), - View1 = case Erasable of - [] -> View; - _ -> group_to_view( - erase_members_in_group(Erasable, GroupName, TxnFun)) - end, - change_view(View1, State #state { members_state = MembersState1 }). - -can_erase_view_member(Self, Self, _LA, _LP) -> false; -can_erase_view_member(_Self, _Id, N, N) -> true; -can_erase_view_member(_Self, _Id, _LA, _LP) -> false. - -neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). -neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). - -%% --------------------------------------------------------------------------- -%% View monitoring and maintanence -%% --------------------------------------------------------------------------- - -ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> - {Self, undefined}; -ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}), - {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; -ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> - {RealNeighbour, MRef}; -ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> - true = ?INSTR_MOD:demonitor(MRef), - Msg = {?TAG, Ver, check_neighbours}, - ok = neighbour_cast(RealNeighbour, Msg), - ok = case Neighbour of - Self -> ok; - _ -> neighbour_cast(Neighbour, Msg) - end, - {Neighbour, maybe_monitor(Neighbour, Self)}. - -maybe_monitor( Self, Self) -> undefined; -maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). - -check_neighbours(State = #state { self = Self, - left = Left, - right = Right, - view = View, - broadcast_buffer = Buffer }) -> - #view_member { left = VLeft, right = VRight } - = fetch_view_member(Self, View), - Ver = view_version(View), - Left1 = ensure_neighbour(Ver, Self, Left, VLeft), - Right1 = ensure_neighbour(Ver, Self, Right, VRight), - Buffer1 = case Right1 of - {Self, undefined} -> []; - _ -> Buffer - end, - State1 = State #state { left = Left1, right = Right1, - broadcast_buffer = Buffer1 }, - ok = maybe_send_catchup(Right, State1), - State1. - -maybe_send_catchup(Right, #state { right = Right }) -> - ok; -maybe_send_catchup(_Right, #state { self = Self, - right = {Self, undefined} }) -> - ok; -maybe_send_catchup(_Right, #state { members_state = undefined }) -> - ok; -maybe_send_catchup(_Right, #state { self = Self, - right = {Right, _MRef}, - view = View, - members_state = MembersState }) -> - send_right(Right, View, - {catchup, Self, prepare_members_state(MembersState)}). - - -%% --------------------------------------------------------------------------- -%% Catch_up delta detection -%% --------------------------------------------------------------------------- - -find_prefix_common_suffix(A, B) -> - {Prefix, A1} = find_prefix(A, B, queue:new()), - {Common, Suffix} = find_common(A1, B, queue:new()), - {Prefix, Common, Suffix}. - -%% Returns the elements of A that occur before the first element of B, -%% plus the remainder of A. -find_prefix(A, B, Prefix) -> - case {queue:out(A), queue:out(B)} of - {{{value, Val}, _A1}, {{value, Val}, _B1}} -> - {Prefix, A}; - {{empty, A1}, {{value, _A}, _B1}} -> - {Prefix, A1}; - {{{value, {NumA, _MsgA} = Val}, A1}, - {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB -> - find_prefix(A1, B, queue:in(Val, Prefix)); - {_, {empty, _B1}} -> - {A, Prefix} %% Prefix well be empty here - end. - -%% A should be a prefix of B. Returns the commonality plus the -%% remainder of B. -find_common(A, B, Common) -> - case {queue:out(A), queue:out(B)} of - {{{value, Val}, A1}, {{value, Val}, B1}} -> - find_common(A1, B1, queue:in(Val, Common)); - {{empty, _A}, _} -> - {Common, B} - end. - - -%% --------------------------------------------------------------------------- -%% Members helpers -%% --------------------------------------------------------------------------- - -with_member(Fun, Id, MembersState) -> - store_member( - Id, Fun(find_member_or_blank(Id, MembersState)), MembersState). - -with_member_acc(Fun, Id, {MembersState, Acc}) -> - {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc), - {store_member(Id, MemberState, MembersState), Acc1}. - -find_member_or_blank(Id, MembersState) -> - case ?DICT:find(Id, MembersState) of - {ok, Result} -> Result; - error -> blank_member() - end. - -erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). - -blank_member() -> - #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. - -blank_member_state() -> ?DICT:new(). - -store_member(Id, MemberState, MembersState) -> - ?DICT:store(Id, MemberState, MembersState). - -prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). - -build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). - -make_member(GroupName) -> - {case dirty_read_group(GroupName) of - #gm_group { version = Version } -> Version; - {error, not_found} -> ?VERSION_START - end, self()}. - -remove_erased_members(MembersState, View) -> - lists:foldl(fun (Id, MembersState1) -> - store_member(Id, find_member_or_blank(Id, MembersState), - MembersState1) - end, blank_member_state(), all_known_members(View)). - -get_version({Version, _Pid}) -> Version. - -get_pid({_Version, Pid}) -> Pid. - -get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. - -%% --------------------------------------------------------------------------- -%% Activity assembly -%% --------------------------------------------------------------------------- - -activity_nil() -> queue:new(). - -activity_cons( _Id, [], [], Tail) -> Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). - -activity_finalise(Activity) -> queue:to_list(Activity). - -maybe_send_activity([], _State) -> - ok; -maybe_send_activity(Activity, #state { self = Self, - right = {Right, _MRefR}, - view = View }) -> - send_right(Right, View, {activity, Self, Activity}). - -send_right(Right, View, Msg) -> - ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}). - -callback(Args, Module, Activity) -> - Result = - lists:foldl( - fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) -> - lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) -> - case Module2:handle_msg( - Args2, get_pid(Id), Pub) of - ok -> - Acc; - {become, Module3, Args3} -> - {Args3, Module3, ok}; - {stop, _Reason} = Error -> - Error - end; - (_, Error = {stop, _Reason}) -> - Error - end, {Args1, Module1, ok}, Pubs); - (_, Error = {stop, _Reason}) -> - Error - end, {Args, Module, ok}, Activity), - case Result of - {Args, Module, ok} -> ok; - {Args1, Module1, ok} -> {become, Module1, Args1}; - {stop, _Reason} = Error -> Error - end. - -change_view(View, State = #state { view = View0, - module = Module, - callback_args = Args }) -> - OldMembers = all_known_members(View0), - NewMembers = all_known_members(View), - Births = NewMembers -- OldMembers, - Deaths = OldMembers -- NewMembers, - Result = case {Births, Deaths} of - {[], []} -> ok; - _ -> Module:members_changed( - Args, get_pids(Births), get_pids(Deaths)) - end, - {Result, check_neighbours(State #state { view = View })}. - -handle_callback_result({Result, State}) -> - if_callback_success( - Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); -handle_callback_result({Result, Reply, State}) -> - if_callback_success( - Result, fun reply_true/3, fun reply_false/3, Reply, State). - -no_reply_true (_Result, _Undefined, State) -> noreply(State). -no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. - -reply_true (_Result, Reply, State) -> reply(Reply, State). -reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. - -handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). -handle_msg_false(Result, _Msg, State) -> {Result, State}. - -activity_true(_Result, Activity, State = #state { module = Module, - callback_args = Args }) -> - {callback(Args, Module, Activity), State}. -activity_false(Result, _Activity, State) -> - {Result, State}. - -if_callback_success(ok, True, _False, Arg, State) -> - True(ok, Arg, State); -if_callback_success( - {become, Module, Args} = Result, True, _False, Arg, State) -> - True(Result, Arg, State #state { module = Module, - callback_args = Args }); -if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> - False(Result, Arg, State). - -maybe_confirm(_Self, _Id, Confirms, []) -> - Confirms; -maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> - case queue:out(Confirms) of - {empty, _Confirms} -> - Confirms; - {{value, {PubNum, From}}, Confirms1} -> - gen_server2:reply(From, ok), - maybe_confirm(Self, Self, Confirms1, PubNums); - {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum -> - maybe_confirm(Self, Self, Confirms, PubNums) - end; -maybe_confirm(_Self, _Id, Confirms, _PubNums) -> - Confirms. - -purge_confirms(Confirms) -> - [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], - queue:new(). - - -%% --------------------------------------------------------------------------- -%% Msg transformation -%% --------------------------------------------------------------------------- - -acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. - -pubs_from_queue(Q) -> queue:to_list(Q). - -queue_from_pubs(Pubs) -> queue:from_list(Pubs). - -apply_acks( [], Pubs) -> Pubs; -apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. - -join_pubs(Q, []) -> Q; -join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). - -last_ack( [], LA) -> LA; -last_ack(List, LA) -> LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub( [], LP) -> LP; -last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. - -%% --------------------------------------------------------------------------- - -%% Uninstrumented versions - -call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). -cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). -monitor(Pid) -> erlang:monitor(process, Pid). -demonitor(MRef) -> erlang:demonitor(MRef). |