diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-28 07:07:52 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-28 07:07:52 +0100 |
commit | b62860a22f8dbbc04694df27803b5d9d021d2981 (patch) | |
tree | 5a96bb1a0add471e369fe4fc9f13f38ba047faeb | |
parent | 8c55cc1fdaebaee2f2adc90453b36e2590686cdc (diff) | |
parent | 34894f392d2f9a20c0e0f8c257d09a9eb2990835 (diff) | |
download | rabbitmq-server-b62860a22f8dbbc04694df27803b5d9d021d2981.tar.gz |
merge bug24180 into default
26 files changed, 2288 insertions, 273 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 06dcfff7..a0f03192 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -513,17 +513,22 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_user_tags</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>tag</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> <term>username</term> - <listitem><para>The name of the user whose administrative - status is to be set.</para></listitem> + <listitem><para>The name of the user whose tags are to + be set.</para></listitem> + </varlistentry> + <varlistentry> + <term>tag</term> + <listitem><para>Zero, one or more tags to set. Any + existing tags will be removed.</para></listitem> </varlistentry> </variablelist> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl set_admin tonyg</screen> + <screen role="example">rabbitmqctl set_user_tags tonyg administrator</screen> <para role="example"> This command instructs the RabbitMQ broker to ensure the user named <command>tonyg</command> is an administrator. This has no @@ -532,24 +537,10 @@ user logs in via some other means (for example with the management plugin). </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> - <listitem> - <variablelist> - <varlistentry> - <term>username</term> - <listitem><para>The name of the user whose administrative - status is to be cleared.</para></listitem> - </varlistentry> - </variablelist> - <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl clear_admin tonyg</screen> + <screen role="example">rabbitmqctl set_user_tags tonyg</screen> <para role="example"> - This command instructs the RabbitMQ broker to ensure the user - named <command>tonyg</command> is not an administrator. + This command instructs the RabbitMQ broker to remove any + tags from the user named <command>tonyg</command>. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d06f167e..65a3269a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -25,7 +25,7 @@ {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, - {default_user_is_admin, true}, + {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {cluster_nodes, []}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 67e2dfe5..00b7e6e9 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -15,12 +15,12 @@ %% -record(user, {username, - is_admin, + tags, auth_backend, %% Module this user came from impl %% Scratch space for that module }). --record(internal_user, {username, password_hash, is_admin}). +-record(internal_user, {username, password_hash, tags}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). @@ -46,7 +46,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid}). + arguments, pid, slave_pids, mirror_nodes}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl index e26d44ea..803bb75c 100644 --- a/include/rabbit_auth_backend_spec.hrl +++ b/include/rabbit_auth_backend_spec.hrl @@ -22,8 +22,7 @@ {'ok', rabbit_types:user()} | {'refused', string(), [any()]} | {'error', any()}). --spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(), - rabbit_access_control:vhost_permission_atom()) -> +-spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> boolean() | {'error', any()}). -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 809f518b..4a866305 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -5,7 +5,7 @@ PortSystem 1.0 name rabbitmq-server version @VERSION@ categories net -maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer +maintainers paperplanes.de:meyer openmaintainer platforms darwin supported_archs noarch @@ -24,11 +24,9 @@ distfiles ${name}-${version}${extract.suffix} \ checksums \ ${name}-${version}${extract.suffix} \ - md5 @md5-src@ \ sha1 @sha1-src@ \ rmd160 @rmd160-src@ \ ${name}-generic-unix-${version}${extract.suffix} \ - md5 @md5-bin@ \ sha1 @sha1-bin@ \ rmd160 @rmd160-bin@ diff --git a/packaging/macports/make-checksums.sh b/packaging/macports/make-checksums.sh index 11424dfc..891de6ba 100755 --- a/packaging/macports/make-checksums.sh +++ b/packaging/macports/make-checksums.sh @@ -6,7 +6,7 @@ for type in src bin do tarball_var=tarball_${type} tarball=${!tarball_var} - for algo in md5 sha1 rmd160 + for algo in sha1 rmd160 do checksum=$(openssl $algo ${tarball} | awk '{print $NF}') echo "s|@$algo-$type@|$checksum|g" diff --git a/src/rabbit.erl b/src/rabbit.erl index 8866a1b7..100cacb0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -486,16 +486,13 @@ maybe_insert_default_data() -> insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), - {ok, DefaultAdmin} = application:get_env(default_user_is_admin), + {ok, DefaultTags} = application:get_env(default_user_tags), {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_vhost:add(DefaultVHost), ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), - case DefaultAdmin of - true -> rabbit_auth_backend_internal:set_admin(DefaultUser); - _ -> ok - end, + ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags), ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, DefaultConfigurePerm, DefaultWritePerm, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 59c00848..c0ae18c0 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -19,16 +19,15 @@ -include("rabbit.hrl"). -export([check_user_pass_login/2, check_user_login/2, - check_vhost_access/2, check_resource_access/3, list_vhosts/2]). + check_vhost_access/2, check_resource_access/3]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([permission_atom/0, vhost_permission_atom/0]). +-export_type([permission_atom/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). --type(vhost_permission_atom() :: 'read' | 'write'). -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) @@ -39,8 +38,6 @@ -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> 'ok' | rabbit_types:channel_exit()). --spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom()) - -> [rabbit_types:vhost()]). -endif. @@ -70,7 +67,7 @@ check_vhost_access(User = #user{ username = Username, check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso - Module:check_vhost_access(User, VHostPath, write) + Module:check_vhost_access(User, VHostPath) end, "~s failed checking vhost access to ~s for ~s: ~p~n", [Module, VHostPath, Username], @@ -104,21 +101,3 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> false -> rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) end. - -%% Permission = write -> log in -%% Permission = read -> learn of the existence of (only relevant for -%% management plugin) -list_vhosts(User = #user{username = Username, auth_backend = Module}, - Permission) -> - lists:filter( - fun(VHost) -> - case Module:check_vhost_access(User, VHost, Permission) of - {error, _} = E -> - rabbit_log:warning("~w failed checking vhost access " - "to ~s for ~s: ~p~n", - [Module, VHost, Username, E]), - false; - Else -> - Else - end - end, rabbit_vhost:list()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c8703740..bacb1d21 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,6 +27,8 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). + %% internal -export([internal_declare/2, internal_delete/1, @@ -191,18 +193,21 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(Q) || Q <- DurableQueues], + Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none}), + {Node, MNodes} = determine_queue_nodes(Args), + Q = start_queue_process(Node, #amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -240,8 +245,24 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), +determine_queue_nodes(Args) -> + Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), + PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), + case {Policy, PolicyParams} of + {{_Type, <<"nodes">>}, {array, Nodes}} -> + case [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] of + [Node] -> {Node, undefined}; + [First | Rest] -> {First, Rest} + end; + {{_Type, <<"all">>}, _} -> + {node(), all}; + _ -> + {node(), undefined} + end. + +start_queue_process(Node, Q) -> + {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -257,8 +278,13 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); - {error, not_found} -> E() + {ok, Q = #amqqueue{slave_pids = []}} -> + rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); + {ok, Q} -> + E1 = fun () -> timer:sleep(25), with(Name, F, E) end, + rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end); + {error, not_found} -> + E() end. with(Name, F) -> @@ -295,31 +321,58 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> - rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>]). + rabbit_misc:assert_args_equivalence( + Args, RequiredArgs, QueueName, + [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key)) of + [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/1}, - {<<"x-message-ttl">>, fun check_integer_argument/1}]], + [{<<"x-expires">>, fun check_integer_argument/2}, + {<<"x-message-ttl">>, fun check_integer_argument/2}, + {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], ok. -check_integer_argument(undefined) -> +check_integer_argument(undefined, _Args) -> ok; -check_integer_argument({Type, Val}) when Val > 0 -> +check_integer_argument({Type, Val}, _Args) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}) -> +check_integer_argument({_Type, Val}, _Args) -> {error, {value_zero_or_less, Val}}. +check_ha_policy_argument(undefined, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"all">>}, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of + undefined -> + {error, {require, 'x-ha-policy-params'}}; + {array, []} -> + {error, {require_non_empty_list_of_nodes_for_ha}}; + {array, Ary} -> + case lists:all(fun ({longstr, _Node}) -> true; + (_ ) -> false + end, Ary) of + true -> ok; + false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} + end; + {Type, _} -> + {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} + end; +check_ha_policy_argument({longstr, Policy}, _Args) -> + {error, {invalid_ha_policy, Policy}}; +check_ha_policy_argument({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -474,7 +527,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])), rabbit_binding:process_deletions( @@ -487,11 +541,13 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, - auto_delete = false, - arguments = [], - pid = Pid}. + #amqqueue{name = QueueName, + durable = false, + auto_delete = false, + arguments = [], + pid = Pid, + slave_pids = [], + mirror_nodes = undefined}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1e5ad349..e388ccf2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,6 +33,8 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). +-export([init_with_backing_queue_state/7]). + %% Queue's state -record(q, {q, exclusive_consumer, @@ -72,7 +74,8 @@ messages, consumers, memory, - backing_queue_status + backing_queue_status, + slave_pids ]). -define(CREATION_EVENT_KEYS, @@ -81,7 +84,8 @@ durable, auto_delete, arguments, - owner_pid + owner_pid, + mirror_nodes ]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -114,6 +118,34 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, AckTags, Deliveries, MTC) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = MTC})), + lists:foldl( + fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, + State, Deliveries). + terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> @@ -188,7 +220,7 @@ terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), case BQS of - undefined -> State; + undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), BQS1 = lists:foldl( fun (#cr{txn = none}, BQSN) -> @@ -225,9 +257,12 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{}) -> - {ok, BQM} = application:get_env(backing_queue_module), - BQM. +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Policy -> rabbit_mirror_queue_master + end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( @@ -769,6 +804,12 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + SPids; +i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), + MNodes; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 1344956e..2c28adce 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_child/1]). +-export([start_link/0, start_child/2]). -export([init/1]). @@ -29,8 +29,8 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Args) -> - supervisor2:start_child(?SERVER, Args). +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). init([]) -> {ok, {{simple_one_for_one_terminate, 10, 10}, diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index 09820c5b..ade158bb 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -36,17 +36,13 @@ behaviour_info(callbacks) -> %% Client failed authentication. Log and die. {check_user_login, 2}, - %% Given #user, vhost path and permission, can a user access a vhost? - %% Permission is read - learn of the existence of (only relevant for - %% management plugin) - %% or write - log in - %% + %% Given #user and vhost, can a user log in to a vhost? %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. - {check_vhost_access, 3}, + {check_vhost_access, 2}, %% Given #user, resource and permission, can a user access a resource? %% diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 2a42ff88..6a018bd1 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -20,10 +20,10 @@ -behaviour(rabbit_auth_backend). -export([description/0]). --export([check_user_login/2, check_vhost_access/3, check_resource_access/3]). +-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). +-export([add_user/2, delete_user/1, change_password/2, set_tags/2, + list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]). -export([make_salt/0, check_password/2, change_password_hash/2, hash_password/1]). -export([set_permissions/5, clear_permissions/2, @@ -50,9 +50,9 @@ rabbit_types:password_hash()) -> 'ok'). -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). --spec(set_admin/1 :: (rabbit_types:username()) -> 'ok'). --spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok'). --spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]). +-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). +-spec(list_users/0 :: () -> rabbit_types:infos()). +-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(lookup_user/1 :: (rabbit_types:username()) -> rabbit_types:ok(rabbit_types:internal_user()) | rabbit_types:error('not_found')). @@ -77,6 +77,7 @@ %%---------------------------------------------------------------------------- -define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). %% Implementation of rabbit_auth_backend @@ -97,10 +98,10 @@ check_user_login(Username, AuthProps) -> internal_check_user_login(Username, Fun) -> Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of - {ok, User = #internal_user{is_admin = IsAdmin}} -> + {ok, User = #internal_user{tags = Tags}} -> case Fun(User) of true -> {ok, #user{username = Username, - is_admin = IsAdmin, + tags = Tags, auth_backend = ?MODULE, impl = User}}; _ -> Refused @@ -109,16 +110,13 @@ internal_check_user_login(Username, Fun) -> Refused end. -check_vhost_access(#user{is_admin = true}, _VHostPath, read) -> - true; - -check_vhost_access(#user{username = Username}, VHostPath, _) -> +check_vhost_access(#user{username = Username}, VHost) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, #user_vhost{username = Username, - virtual_host = VHostPath}}) of + virtual_host = VHost}}) of [] -> false; [_R] -> true end @@ -161,7 +159,7 @@ add_user(Username, Password) -> #internal_user{username = Username, password_hash = hash_password(Password), - is_admin = false}, + tags = []}, write); _ -> mnesia:abort({user_already_exists, Username}) @@ -222,16 +220,12 @@ salted_md5(Salt, Cleartext) -> Salted = <<Salt/binary, Cleartext/binary>>, erlang:md5(Salted). -set_admin(Username) -> set_admin(Username, true). - -clear_admin(Username) -> set_admin(Username, false). - -set_admin(Username, IsAdmin) -> +set_tags(Username, Tags) -> R = update_user(Username, fun(User) -> - User#internal_user{is_admin = IsAdmin} + User#internal_user{tags = Tags} end), - rabbit_log:info("Set user admin flag for user ~p to ~p~n", - [Username, IsAdmin]), + rabbit_log:info("Set user tags for user ~p to ~p~n", + [Username, Tags]), R. update_user(Username, Fun) -> @@ -244,10 +238,12 @@ update_user(Username, Fun) -> end)). list_users() -> - [{Username, IsAdmin} || - #internal_user{username = Username, is_admin = IsAdmin} <- + [[{user, Username}, {tags, Tags}] || + #internal_user{username = Username, tags = Tags} <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. +user_info_keys() -> ?USER_INFO_KEYS. + lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 355ac549..9eef384a 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -235,17 +235,17 @@ action(clear_password, Node, Args = [Username], _Opts, Inform) -> Inform("Clearing password for user ~p", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); -action(set_admin, Node, [Username], _Opts, Inform) -> - Inform("Setting administrative status for user ~p", [Username]), - call(Node, {rabbit_auth_backend_internal, set_admin, [Username]}); - -action(clear_admin, Node, [Username], _Opts, Inform) -> - Inform("Clearing administrative status for user ~p", [Username]), - call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]}); +action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> + Tags = [list_to_atom(T) || T <- TagsStr], + Inform("Setting tags for user ~p to ~p", [Username, Tags]), + rpc_call(Node, rabbit_auth_backend_internal, set_tags, + [list_to_binary(Username), Tags]); action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), - display_list(call(Node, {rabbit_auth_backend_internal, list_users, []})); + display_info_list( + call(Node, {rabbit_auth_backend_internal, list_users, []}), + rabbit_auth_backend_internal:user_info_keys()); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), @@ -422,17 +422,6 @@ format_info_item([T | _] = Value) format_info_item(Value) -> io_lib:format("~w", [Value]). -display_list(L) when is_list(L) -> - lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [escape(I)]); - (I) when is_tuple(I) -> - display_row([escape(V) - || V <- tuple_to_list(I)]) - end, - lists:sort(L)), - ok; -display_list(Other) -> Other. - display_call_result(Node, MFA) -> case call(Node, MFA) of {badrpc, _} = Res -> throw(Res); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 1b72dd76..e79583fa 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -120,9 +120,9 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired}, _From, +handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> - {reply, false, State}; + {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl new file mode 100644 index 00000000..57f6ca8b --- /dev/null +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -0,0 +1,434 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_coordinator). + +-export([start_link/3, get_gm/1, ensure_monitoring/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm, + monitors, + death_fun + }). + +-define(ONE_SECOND, 1000). + +-ifdef(use_specs). + +-spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun()) -> + rabbit_types:ok_pid_or_error()). +-spec(get_gm/1 :: (pid()) -> pid()). +-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% +%% Mirror Queues +%% +%% A queue with mirrors consists of the following: +%% +%% #amqqueue{ pid, mirror_pids } +%% | | +%% +----------+ +-------+--------------+-----------...etc... +%% | | | +%% V V V +%% amqqueue_process---+ slave-----+ slave-----+ ...etc... +%% | BQ = master----+ | | BQ = vq | | BQ = vq | +%% | | BQ = vq | | +-+-------+ +-+-------+ +%% | +-+-------+ | | | +%% +-++-----|---------+ | | (some details elided) +%% || | | | +%% || coordinator-+ | | +%% || +-+---------+ | | +%% || | | | +%% || gm-+ -- -- -- -- gm-+- -- -- -- gm-+- -- --...etc... +%% || +--+ +--+ +--+ +%% || +%% consumers +%% +%% The master is merely an implementation of bq, and thus is invoked +%% through the normal bq interface by the amqqueue_process. The slaves +%% meanwhile are processes in their own right (as is the +%% coordinator). The coordinator and all slaves belong to the same gm +%% group. Every member of a gm group receives messages sent to the gm +%% group. Because the master is the bq of amqqueue_process, it doesn't +%% have sole control over its mailbox, and as a result, the master +%% itself cannot be passed messages directly (well, it could by via +%% the amqqueue:run_backing_queue_async callback but that would induce +%% additional unnecessary loading on the master queue process), yet it +%% needs to react to gm events, such as the death of slaves. Thus the +%% master creates the coordinator, and it is the coordinator that is +%% the gm callback module and event handler for the master. +%% +%% Consumers are only attached to the master. Thus the master is +%% responsible for informing all slaves when messages are fetched from +%% the bq, when they're acked, and when they're requeued. +%% +%% The basic goal is to ensure that all slaves performs actions on +%% their bqs in the same order as the master. Thus the master +%% intercepts all events going to its bq, and suitably broadcasts +%% these events on the gm. The slaves thus receive two streams of +%% events: one stream is via the gm, and one stream is from channels +%% directly. Whilst the stream via gm is guaranteed to be consistently +%% seen by all slaves, the same is not true of the stream via +%% channels. For example, in the event of an unexpected death of a +%% channel during a publish, only some of the mirrors may receive that +%% publish. As a result of this problem, the messages broadcast over +%% the gm contain published content, and thus slaves can operate +%% successfully on messages that they only receive via the gm. The key +%% purpose of also sending messages directly from the channels to the +%% slaves is that without this, in the event of the death of the +%% master, messages could be lost until a suitable slave is promoted. +%% +%% However, that is not the only reason. For example, if confirms are +%% in use, then there is no guarantee that every slave will see the +%% delivery with the same msg_seq_no. As a result, the slaves have to +%% wait until they've seen both the publish via gm, and the publish +%% via the channel before they have enough information to be able to +%% perform the publish to their own bq, and subsequently issue the +%% confirm, if necessary. Either form of publish can arrive first, and +%% a slave can be upgraded to the master at any point during this +%% process. Confirms continue to be issued correctly, however. +%% +%% Because the slave is a full process, it impersonates parts of the +%% amqqueue API. However, it does not need to implement all parts: for +%% example, no ack or consumer-related message can arrive directly at +%% a slave from a channel: it is only publishes that pass both +%% directly to the slaves and go via gm. +%% +%% Slaves can be added dynamically. When this occurs, there is no +%% attempt made to sync the current contents of the master with the +%% new slave, thus the slave will start empty, regardless of the state +%% of the master. Thus the slave needs to be able to detect and ignore +%% operations which are for messages it has not received: because of +%% the strict FIFO nature of queues in general, this is +%% straightforward - all new publishes that the new slave receives via +%% gm should be processed as normal, but fetches which are for +%% messages the slave has never seen should be ignored. Similarly, +%% acks for messages the slave never fetched should be +%% ignored. Eventually, as the master is consumed from, the messages +%% at the head of the queue which were there before the slave joined +%% will disappear, and the slave will become fully synced with the +%% state of the master. The detection of the sync-status of a slave is +%% done entirely based on length: if the slave and the master both +%% agree on the length of the queue after the fetch of the head of the +%% queue, then the queues must be in sync. The only other possibility +%% is that the slave's queue is shorter, and thus the fetch should be +%% ignored. +%% +%% Because acktags are issued by the bq independently, and because +%% there is no requirement for the master and all slaves to use the +%% same bq, all references to msgs going over gm is by msg_id. Thus +%% upon acking, the master must convert the acktags back to msg_ids +%% (which happens to be what bq:ack returns), then sends the msg_ids +%% over gm, the slaves must convert the msg_ids to acktags (a mapping +%% the slaves themselves must maintain). +%% +%% When the master dies, a slave gets promoted. This will be the +%% eldest slave, and thus the hope is that that slave is most likely +%% to be sync'd with the master. The design of gm is that the +%% notification of the death of the master will only appear once all +%% messages in-flight from the master have been fully delivered to all +%% members of the gm group. Thus at this point, the slave that gets +%% promoted cannot broadcast different events in a different order +%% than the master for the same msgs: there is no possibility for the +%% same msg to be processed by the old master and the new master - if +%% it was processed by the old master then it will have been processed +%% by the slave before the slave was promoted, and vice versa. +%% +%% Upon promotion, all msgs pending acks are requeued as normal, the +%% slave constructs state suitable for use in the master module, and +%% then dynamically changes into an amqqueue_process with the master +%% as the bq, and the slave's bq as the master's bq. Thus the very +%% same process that was the slave is now a full amqqueue_process. +%% +%% It is important that we avoid memory leaks due to the death of +%% senders (i.e. channels) and partial publications. A sender +%% publishing a message may fail mid way through the publish and thus +%% only some of the mirrors will receive the message. We need the +%% mirrors to be able to detect this and tidy up as necessary to avoid +%% leaks. If we just had the master monitoring all senders then we +%% would have the possibility that a sender appears and only sends the +%% message to a few of the slaves before dying. Those slaves would +%% then hold on to the message, assuming they'll receive some +%% instruction eventually from the master. Thus we have both slaves +%% and the master monitor all senders they become aware of. But there +%% is a race: if the slave receives a DOWN of a sender, how does it +%% know whether or not the master is going to send it instructions +%% regarding those messages? +%% +%% Whilst the master monitors senders, it can't access its mailbox +%% directly, so it delegates monitoring to the coordinator. When the +%% coordinator receives a DOWN message from a sender, it informs the +%% master via a callback. This allows the master to do any tidying +%% necessary, but more importantly allows the master to broadcast a +%% sender_death message to all the slaves, saying the sender has +%% died. Once the slaves receive the sender_death message, they know +%% that they're not going to receive any more instructions from the gm +%% regarding that sender, thus they throw away any publications from +%% the sender pending publication instructions. However, it is +%% possible that the coordinator receives the DOWN and communicates +%% that to the master before the master has finished receiving and +%% processing publishes from the sender. This turns out not to be a +%% problem: the sender has actually died, and so will not need to +%% receive confirms or other feedback, and should further messages be +%% "received" from the sender, the master will ask the coordinator to +%% set up a new monitor, and will continue to process the messages +%% normally. Slaves may thus receive publishes via gm from previously +%% declared "dead" senders, but again, this is fine: should the slave +%% have just thrown out the message it had received directly from the +%% sender (due to receiving a sender_death message via gm), it will be +%% able to cope with the publication purely from the master via gm. +%% +%% When a slave receives a DOWN message for a sender, if it has not +%% received the sender_death message from the master via gm already, +%% then it will wait 20 seconds before broadcasting a request for +%% confirmation from the master that the sender really has died. +%% Should a sender have only sent a publish to slaves, this allows +%% slaves to inform the master of the previous existence of the +%% sender. The master will thus monitor the sender, receive the DOWN, +%% and subsequently broadcast the sender_death message, allowing the +%% slaves to tidy up. This process can repeat for the same sender: +%% consider one slave receives the publication, then the DOWN, then +%% asks for confirmation of death, then the master broadcasts the +%% sender_death message. Only then does another slave receive the +%% publication and thus set up its monitoring. Eventually that slave +%% too will receive the DOWN, ask for confirmation and the master will +%% monitor the sender again, receive another DOWN, and send out +%% another sender_death message. Given the 20 second delay before +%% requesting death confirmation, this is highly unlikely, but it is a +%% possibility. +%% +%% When the 20 second timer expires, the slave first checks to see +%% whether it still needs confirmation of the death before requesting +%% it. This prevents unnecessary traffic on gm as it allows one +%% broadcast of the sender_death message to satisfy many slaves. +%% +%% If we consider the promotion of a slave at this point, we have two +%% possibilities: that of the slave that has received the DOWN and is +%% thus waiting for confirmation from the master that the sender +%% really is down; and that of the slave that has not received the +%% DOWN. In the first case, in the act of promotion to master, the new +%% master will monitor again the dead sender, and after it has +%% finished promoting itself, it should find another DOWN waiting, +%% which it will then broadcast. This will allow slaves to tidy up as +%% normal. In the second case, we have the possibility that +%% confirmation-of-sender-death request has been broadcast, but that +%% it was broadcast before the master failed, and that the slave being +%% promoted does not know anything about that sender, and so will not +%% monitor it on promotion. Thus a slave that broadcasts such a +%% request, at the point of broadcasting it, recurses, setting another +%% 20 second timer. As before, on expiry of the timer, the slaves +%% checks to see whether it still has not received a sender_death +%% message for the dead sender, and if not, broadcasts a death +%% confirmation request. Thus this ensures that even when a master +%% dies and the new slave has no knowledge of the dead sender, it will +%% eventually receive a death confirmation request, shall monitor the +%% dead sender, receive the DOWN and broadcast the sender_death +%% message. +%% +%% The preceding commentary deals with the possibility of slaves +%% receiving publications from senders which the master does not, and +%% the need to prevent memory leaks in such scenarios. The inverse is +%% also possible: a partial publication may cause only the master to +%% receive a publication. It will then publish the message via gm. The +%% slaves will receive it via gm, will publish it to their BQ and will +%% set up monitoring on the sender. They will then receive the DOWN +%% message and the master will eventually publish the corresponding +%% sender_death message. The slave will then be able to tidy up its +%% state as normal. +%% +%% We don't support transactions on mirror queues. To do so is +%% challenging. The underlying bq is free to add the contents of the +%% txn to the queue proper at any point after the tx.commit comes in +%% but before the tx.commit-ok goes out. This means that it is not +%% safe for all mirrors to simply issue the bq:tx_commit at the same +%% time, as the addition of the txn's contents to the queue may +%% subsequently be inconsistently interwoven with other actions on the +%% bq. The solution to this is, in the master, wrap the PostCommitFun +%% and do the gm:broadcast in there: at that point, you're in the bq +%% (well, there's actually nothing to stop that function being invoked +%% by some other process, but let's pretend for now: you could always +%% use run_backing_queue to ensure you really are in the queue process +%% (the _async variant would be unsafe from an ordering pov)), the +%% gm:broadcast is safe because you don't have to worry about races +%% with other gm:broadcast calls (same process). Thus this signal +%% would indicate sufficiently to all the slaves that they must insert +%% the complete contents of the txn at precisely this point in the +%% stream of events. +%% +%% However, it's quite difficult for the slaves to make that happen: +%% they would be forced to issue the bq:tx_commit at that point, but +%% then stall processing any further instructions from gm until they +%% receive the notification from their bq that the tx_commit has fully +%% completed (i.e. they need to treat what is an async system as being +%% fully synchronous). This is not too bad (apart from the +%% vomit-inducing notion of it all): just need a queue of instructions +%% from the GM; but then it gets rather worse when you consider what +%% needs to happen if the master dies at this point and the slave in +%% the middle of this tx_commit needs to be promoted. +%% +%% Finally, we can't possibly hope to make transactions atomic across +%% mirror queues, and it's not even clear that that's desirable: if a +%% slave fails whilst there's an open transaction in progress then +%% when the channel comes to commit the txn, it will detect the +%% failure and destroy the channel. However, the txn will have +%% actually committed successfully in all the other mirrors (including +%% master). To do this bit properly would require 2PC and all the +%% baggage that goes with that. +%% +%% Recovery of mirrored queues is straightforward: as nodes die, the +%% remaining nodes record this, and eventually a situation is reached +%% in which only one node is alive, which is the master. This is the +%% only node which, upon recovery, will resurrect a mirrored queue: +%% nodes which die and then rejoin as a slave will start off empty as +%% if they have no mirrored content at all. This is not surprising: to +%% achieve anything more sophisticated would require the master and +%% recovering slave to be able to check to see whether they agree on +%% the last seen state of the queue: checking length alone is not +%% sufficient in this case. +%% +%% For more documentation see the comments in bug 23554. +%% +%%---------------------------------------------------------------------------- + +start_link(Queue, GM, DeathFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). + +get_gm(CPid) -> + gen_server2:call(CPid, get_gm, infinity). + +ensure_monitoring(CPid, Pids) -> + gen_server2:cast(CPid, {ensure_monitoring, Pids}). + +%% --------------------------------------------------------------------------- +%% gen_server +%% --------------------------------------------------------------------------- + +init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> + GM1 = case GM of + undefined -> + {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM2, _Members} -> + ok + end, + GM2; + _ -> + true = link(GM), + GM + end, + {ok, _TRef} = + timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(get_gm, _From, State = #state { gm = GM }) -> + reply(GM, State). + +handle_cast({gm_deaths, Deaths}, + State = #state { q = #amqqueue { name = QueueName } }) -> + rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node() -> + noreply(State); + {error, not_found} -> + {stop, normal, State} + end; + +handle_cast({ensure_monitoring, Pids}, + State = #state { monitors = Monitors }) -> + Monitors1 = + lists:foldl(fun (Pid, MonitorsN) -> + case dict:is_key(Pid, MonitorsN) of + true -> MonitorsN; + false -> MRef = erlang:monitor(process, Pid), + dict:store(Pid, MRef, MonitorsN) + end + end, Monitors, Pids), + noreply(State #state { monitors = Monitors1 }). + +handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, + State = #state { monitors = Monitors, + death_fun = Fun }) -> + noreply( + case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = Fun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, #state{}) -> + %% gen_server case + ok; +terminate([_CPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([CPid], Members) -> + CPid ! {joined, self(), Members}, + ok. + +members_changed([_CPid], _Births, []) -> + ok; +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). + +handle_msg([_CPid], _From, heartbeat) -> + ok; +handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> + ok = gen_server2:cast(CPid, Msg); +handle_msg([_CPid], _From, _Msg) -> + ok. + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl new file mode 100644 index 00000000..b090ebe8 --- /dev/null +++ b/src/rabbit_mirror_queue_master.erl @@ -0,0 +1,410 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_master). + +-export([init/4, terminate/2, delete_and_terminate/2, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + set_ram_duration_target/2, ram_duration/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, + status/1, invoke/3, is_duplicate/3, discard/3]). + +-export([start/1, stop/0]). + +-export([promote_backing_queue_state/6, sender_death_fun/0]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(state, { gm, + coordinator, + backing_queue, + backing_queue_state, + set_delivered, + seen_status, + confirmed, + ack_msg_id, + known_senders + }). + +-ifdef(use_specs). + +-export_type([death_fun/0]). + +-type(death_fun() :: fun ((pid()) -> 'ok')). +-type(master_state() :: #state { gm :: pid(), + coordinator :: pid(), + backing_queue :: atom(), + backing_queue_state :: any(), + set_delivered :: non_neg_integer(), + seen_status :: dict(), + confirmed :: [rabbit_guid:guid()], + ack_msg_id :: dict(), + known_senders :: set() + }). + +-spec(promote_backing_queue_state/6 :: + (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(sender_death_fun/0 :: () -> death_fun()). + +-endif. + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator + +%% --------------------------------------------------------------------------- +%% Backing queue +%% --------------------------------------------------------------------------- + +start(_DurableQueues) -> + %% This will never get called as this module will never be + %% installed as the default BQ implementation. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +stop() -> + %% Same as start/1. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, + AsyncCallback, SyncCallback) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun()), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + MNodes1 = + (case MNodes of + all -> rabbit_mnesia:all_clustered_nodes(); + undefined -> []; + _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes] + end) -- [node()], + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = 0, + seen_status = dict:new(), + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:new() }. + +terminate({shutdown, dropped} = Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination - this node has been explicitly + %% dropped. Normally, non-durable queues would be tidied up on + %% startup, but there's a possibility that we will be added back + %% in without this node being restarted. Thus we must do the full + %% blown delete_and_terminate now, but only locally: we do not + %% broadcast delete_and_terminate. + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }; +terminate(Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination. The queue is going down but + %% shouldn't be deleted. Most likely safe shutdown of this + %% node. Thus just let some other slave take over. + State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. + +delete_and_terminate(Reason, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +purge(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_length, 0}), + {Count, BQS1} = BQ:purge(BQS), + {Count, State #state { backing_queue_state = BQS1, + set_delivered = 0 }}. + +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). + +publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, + ChPid, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + %% Must use confirmed_broadcast here in order to guarantee that + %% all slaves are forced to interpret this publish_delivered at + %% the same point, especially if we die and a slave is promoted. + ok = gm:confirmed_broadcast( + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {AckTag, + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 })}. + +dropwhile(Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> + Len = BQ:len(BQS), + BQS1 = BQ:dropwhile(Fun, BQS), + Dropped = Len - BQ:len(BQS1), + SetDelivered1 = lists:max([0, SetDelivered - Dropped]), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. + +drain_confirmed(State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS, + confirmed = Confirmed }) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + %% We will never see 'discarded' here + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} + end + end, {[], SS}, MsgIds), + {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1, + confirmed = [] }}. + +fetch(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered, + ack_msg_id = AM }) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, + Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + SetDelivered1 = lists:max([0, SetDelivered - 1]), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1, + ack_msg_id = AM1 }} + end. + +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + case MsgIds of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + end, + {MsgIds, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. + +tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) -> + %% We don't support txns in mirror queues + State. + +tx_ack(_Txn, _AckTags, State) -> + %% We don't support txns in mirror queues + State. + +tx_rollback(_Txn, State) -> + {[], State}. + +tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) -> + PostCommitFun(), %% Probably must run it to avoid deadlocks + {[], State}. + +requeue(AckTags, MsgPropsFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:len(BQS). + +is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:is_empty(BQS). + +set_ram_duration_target(Target, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = + BQ:set_ram_duration_target(Target, BQS) }. + +ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:ram_duration(BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + +needs_timeout(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:needs_timeout(BQS). + +timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:timeout(BQS) }. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. + +status(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:status(BQS). + +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State); +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +is_duplicate(none, Message = #basic_message { id = MsgId }, + State = #state { seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + confirmed = Confirmed }) -> + %% Here, we need to deal with the possibility that we're about to + %% receive a message that we've already seen when we were a slave + %% (we received it via gm). Thus if we do receive such message now + %% via the channel, there may be a confirm waiting to issue for + %% it. + + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, SS) of + error -> + %% We permit the underlying BQ to have a peek at it, but + %% only if we ourselves are not filtering out the msg. + {Result, BQS1} = BQ:is_duplicate(none, Message, BQS), + {Result, State #state { backing_queue_state = BQS1 }}; + {ok, published} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have, in + %% its msg_id_to_channel mapping, the entry for dealing + %% with the confirm when that comes back in (it's added + %% immediately after calling is_duplicate). The msg is + %% invalid. We will not see this again, nor will we be + %% further involved in confirming this message, so erase. + {published, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, confirmed} -> + %% It got published when we were a slave via gm, and + %% confirmed some time after that (maybe even after + %% promotion), but before we received the publish from the + %% channel, so couldn't previously know what the + %% msg_seq_no was (and thus confirm as a slave). So we + %% need to confirm now. As above, amqqueue_process will + %% have the entry for the msg_id_to_channel mapping added + %% immediately after calling is_duplicate/2. + {published, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }}; + {ok, discarded} -> + %% Don't erase from SS here because discard/2 is about to + %% be called and we need to be able to detect this case + {discarded, State} + end; +is_duplicate(_Txn, _Msg, State) -> + %% In a transaction. We don't support txns in mirror queues. But + %% it's probably not a duplicate... + {false, State}. + +discard(Msg = #basic_message { id = MsgId }, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, Msg}), + State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }; + {ok, discarded} -> + State + end. + +%% --------------------------------------------------------------------------- +%% Other exported functions +%% --------------------------------------------------------------------------- + +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus, + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:from_list(KS) }. + +sender_death_fun() -> + Self = self(), + fun (DeadPid) -> + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end) + end. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +maybe_store_acktag(undefined, _MsgId, AM) -> + AM; +maybe_store_acktag(AckTag, MsgId, AM) -> + dict:store(AckTag, MsgId, AM). + +ensure_monitoring(ChPid, State = #state { coordinator = CPid, + known_senders = KS }) -> + case sets:is_element(ChPid, KS) of + true -> State; + false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, [ChPid]), + State #state { known_senders = sets:add_element(ChPid, KS) } + end. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl new file mode 100644 index 00000000..4761f79e --- /dev/null +++ b/src/rabbit_mirror_queue_misc.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_misc). + +-export([remove_from_queue/2, on_node_up/0, + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). + +-include("rabbit.hrl"). + +%% If the dead pids include the queue pid (i.e. the master has died) +%% then only remove that if we are about to be promoted. Otherwise we +%% can have the situation where a slave updates the mnesia record for +%% a queue, promoting another slave before that slave realises it has +%% become the new master, which is bad because it could then mean the +%% slave (now master) receives messages it's not ready for (for +%% example, new consumers). +remove_from_queue(QueueName, DeadPids) -> + DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + rabbit_misc:execute_mnesia_transaction( + fun () -> + %% Someone else could have deleted the queue before we + %% get here. + case mnesia:read({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> + [QPid1 | SPids1] = + [Pid || Pid <- [QPid | SPids], + not lists:member(node(Pid), DeadNodes)], + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + ok; + _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. + Q1 = Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }, + ok = rabbit_amqqueue:store_queue(Q1); + _ -> + %% Master has changed, and we're not it, + %% so leave alone to allow the promoted + %% slave to find it and make its + %% promotion atomic. + ok + end, + {ok, QPid1} + end + end). + +on_node_up() -> + Qs = + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl( + fun (#amqqueue { mirror_nodes = undefined }, QsN) -> + QsN; + (#amqqueue { name = QName, + mirror_nodes = all }, QsN) -> + [QName | QsN]; + (#amqqueue { name = QName, + mirror_nodes = MNodes }, QsN) -> + case lists:member(node(), MNodes) of + true -> [QName | QsN]; + false -> QsN + end + end, [], rabbit_queue) + end), + [add_mirror(Q, node()) || Q <- Qs], + ok. + +drop_mirror(VHostPath, QueueName, MirrorNode) -> + drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +drop_mirror(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + ok + end + end). + +add_mirror(VHostPath, QueueName, MirrorNode) -> + add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +add_mirror(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> Result = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info( + "Adding mirror of queue ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + case Result of + {ok, _Pid} -> ok; + _ -> Result + end; + [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + end + end). + +if_mirrored_queue(Queue, Fun) -> + rabbit_amqqueue:with( + Queue, fun (#amqqueue { arguments = Args } = Q) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of + undefined -> ok; + _ -> Fun(Q) + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 00000000..55d61d41 --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,873 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator +%% +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(DEATH_TIMEOUT, 20000). %% 20 seconds + +-record(state, { q, + gm, + master_pid, + backing_queue, + backing_queue_state, + sync_timer_ref, + rate_timer_ref, + + sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + msg_id_ack, %% :: MsgId -> AckTag + ack_num, + + msg_id_status, + known_senders + }). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + {ok, MPid} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + %% ASSERTION + [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { slave_pids = MPids1 }, + write), + {ok, QPid} + end), + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + {ok, #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + + %% It is safe to reply 'false' here even if a) we've not seen the + %% msg via gm, or b) the master dies before we receive the msg via + %% gm. In the case of (a), we will eventually receive the msg via + %% gm, and it's only the master's result to the channel that is + %% important. In the case of (b), if the master does die and we do + %% get promoted then at that point we have no consumers, thus + %% 'false' is precisely the correct answer. However, we must be + %% careful to _not_ enqueue the message in this case. + + %% Note this is distinct from the case where we receive the msg + %% via gm first, then we're promoted to master, and only then do + %% we receive the msg from the channel. + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, false, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_pid = MPid }) -> + rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + {ok, Pid} -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end; + +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)); + +handle_call({commit, _Txn, _ChPid}, _From, State) -> + %% We don't support transactions in mirror queues + reply(ok, State). + +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }); + +handle_cast(sync_timeout, State) -> + noreply(backing_queue_timeout( + State #state { sync_timer_ref = undefined })); + +handle_cast({rollback, _Txn, _ChPid}, State) -> + %% We don't support transactions in mirror queues + noreply(State). + +handle_info(timeout, State) -> + noreply(backing_queue_timeout(State)); + +handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, + State = #state { gm = GM, master_pid = MPid }) -> + ok = gm:broadcast(GM, {process_death, MPid}), + noreply(State); + +handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> + noreply(local_sender_death(ChPid, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate({shutdown, dropped} = R, #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% See rabbit_mirror_queue_master:terminate/2 + BQ:delete_and_terminate(R, BQS); +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], [], dict:new()), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {run_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + inform_deaths(SPid, Deaths). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> + %% This is only of value to the master + ok; +handle_msg([SPid], _From, {process_death, Pid}) -> + inform_deaths(SPid, [Pid]); +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +inform_deaths(SPid, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) + end, + fun (Mod, Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end) + end). + +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments in + %% confirm_sender_death/1 + Fun(?MODULE, State); +run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> + {MS1, CMs} = + lists:foldl( + fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see 'discarded' here + case dict:find(MsgId, MSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(MsgId, MSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)}; + {ok, {confirmed, _ChPid}} -> + %% It's already been confirmed. This is + %% probably it's been both sync'd to disk + %% and then delivered and ack'd before we've + %% seen the publish from the + %% channel. Nothing to do here. + Acc + end + end, {MS, gb_trees:empty()}, MsgIds), + [ok = rabbit_channel:confirm(ChPid, MsgSeqNos) + || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)], + State #state { msg_id_status = MS1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + msg_id_ack = MA, + msg_id_status = MS, + known_senders = KS }) -> + rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", + [rabbit_misc:rs(Q #amqqueue.name), + rabbit_misc:pid_to_string(self())]), + Q1 = Q #amqqueue { pid = self() }, + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + + %% Everything that we're monitoring, we need to ensure our new + %% coordinator is monitoring. + + MonitoringPids = [begin true = erlang:demonitor(MRef), + Pid + end || {Pid, MRef} <- dict:to_list(KS)], + ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, MonitoringPids), + + %% We find all the messages that we've received from channels but + %% not from gm, and if they're due to be enqueued on promotion + %% then we pass them to the + %% queue_process:init_with_backing_queue_state to be enqueued. + %% + %% We also have to requeue messages which are pending acks: the + %% consumers from the master queue have been lost and so these + %% messages need requeuing. They might also be pending + %% confirmation, and indeed they might also be pending arrival of + %% the publication from the channel itself, if we received both + %% the publication and the fetch via gm first! Requeuing doesn't + %% affect confirmations: if the message was previously pending a + %% confirmation then it still will be, under the same msg_id. So + %% as a master, we need to be prepared to filter out the + %% publication of said messages from the channel (is_duplicate + %% (thus such requeued messages must remain in the msg_id_status + %% (MS) which becomes seen_status (SS) in the master)). + %% + %% Then there are messages we already have in the queue, which are + %% not currently pending acknowledgement: + %% 1. Messages we've only received via gm: + %% Filter out subsequent publication from channel through + %% validate_message. Might have to issue confirms then or + %% later, thus queue_process state will have to know that + %% there's a pending confirm. + %% 2. Messages received via both gm and channel: + %% Queue will have to deal with issuing confirms if necessary. + %% + %% MS contains the following three entry types: + %% + %% a) {published, ChPid}: + %% published via gm only; pending arrival of publication from + %% channel, maybe pending confirm. + %% + %% b) {published, ChPid, MsgSeqNo}: + %% published via gm and channel; pending confirm. + %% + %% c) {confirmed, ChPid}: + %% published via gm only, and confirmed; pending publication + %% from channel. + %% + %% d) discarded + %% seen via gm only as discarded. Pending publication from + %% channel + %% + %% The forms a, c and d only, need to go to the master state + %% seen_status (SS). + %% + %% The form b only, needs to go through to the queue_process + %% state to form the msg_id_to_channel mapping (MTC). + %% + %% No messages that are enqueued from SQ at this point will have + %% entries in MS. + %% + %% Messages that are extracted from MA may have entries in MS, and + %% those messages are then requeued. However, as discussed above, + %% this does not affect MS, nor which bits go through to SS in + %% Master, or MTC in queue_process. + %% + %% Everything that's in MA gets requeued. Consequently the new + %% master should start with a fresh AM as there are no messages + %% pending acks (txns will have been rolled back). + + MSList = dict:to_list(MS), + SS = dict:from_list( + [E || E = {_MsgId, discarded} <- MSList] ++ + [{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- MSList, + Status =:= published orelse Status =:= confirmed]), + + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, SS, MonitoringPids), + + MTC = dict:from_list( + [{MsgId, {ChPid, MsgSeqNo}} || + {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], + AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], + Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), + {Delivery, true} <- queue:to_list(PubQ)], + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries, MTC), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. + +next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 })), + case BQ:needs_timeout(BQS1) of + false -> {stop_sync_timer(State1), hibernate}; + idle -> {stop_sync_timer(State1), 0 }; + timed -> {ensure_sync_timer(State1), 0 } + end. + +backing_queue_timeout(State = #state { backing_queue = BQ }) -> + run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). + +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State #state { sync_timer_ref = TRef }; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { sync_timer_ref = undefined }. + +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { rate_timer_ref = undefined }. + +ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> + case dict:is_key(ChPid, KS) of + true -> State; + false -> MRef = erlang:monitor(process, ChPid), + State #state { known_senders = dict:store(ChPid, MRef, KS) } + end. + +local_sender_death(ChPid, State = #state { known_senders = KS }) -> + ok = case dict:is_key(ChPid, KS) of + false -> ok; + true -> confirm_sender_death(ChPid) + end, + State. + +confirm_sender_death(Pid) -> + %% We have to deal with the possibility that we'll be promoted to + %% master before this thing gets run. Consequently we set the + %% module to rabbit_mirror_queue_master so that if we do become a + %% rabbit_amqqueue_process before then, sane things will happen. + Fun = + fun (?MODULE, State = #state { known_senders = KS, + gm = GM }) -> + %% We're running still as a slave + ok = case dict:is_key(Pid, KS) of + false -> ok; + true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), + confirm_sender_death(Pid) + end, + State; + (rabbit_mirror_queue_master, State) -> + %% We've become a master. State is now opaque to + %% us. When we became master, if Pid was still known + %% to us then we'd have set up monitoring of it then, + %% so this is now a noop. + State + end, + %% Note that we do not remove our knowledge of this ChPid until we + %% get the sender_death from GM. + {ok, _TRef} = timer:apply_after( + ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async, + [self(), rabbit_mirror_queue_master, Fun]), + ok. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { id = MsgId }, + msg_seq_no = MsgSeqNo, + sender = ChPid, + txn = none }, + EnqueueOnPromotion, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + State1 = ensure_monitoring(ChPid, State), + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, MS) of + error -> + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + State1 #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { sender_queues = SQ1, + msg_id_status = dict:erase(MsgId, MS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State1) of + never -> + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; + eventually -> + State1 #state { + msg_id_status = + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end; + {ok, discarded} -> + %% We've already heard from GM that the msg is to be + %% discarded. We won't see this again. + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end; +maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> + %% We don't support txns in mirror queues. + State. + +get_sender_queue(ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> {queue:new(), sets:new()}; + {ok, Val} -> Val + end. + +remove_from_pending_ch(MsgId, ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> + SQ; + {ok, {MQ, PendingCh}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + end. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the msg_id confirmed until we can associate it + %% with a msg_seq_no. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ2} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid}, MS)}; + {{value, {Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We received the msg from the channel first. Thus we + %% need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> + {MQ2, PendingCh, MS}; + eventually -> + {MQ2, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + {MQ2, PendingCh, MS} + end; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. And the channel will not be + %% expecting any confirms from us. + {MQ, PendingCh, MS} + end, + + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, + + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State2 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State2 #state { backing_queue_state = BQS1 }) + end}; +process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + %% Many of the comments around the publish head above apply here + %% too. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, discarded, MS)}; + {{value, {#delivery { message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We've already seen it from the channel, we're not + %% going to see this again, so don't add it to MS + {MQ2, PendingCh, MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. + {MQ, PendingCh, MS} + end, + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + BQS1 = BQ:discard(Msg, ChPid, BQS), + {ok, State1 #state { sender_queues = SQ1, + msg_id_status = MS1, + backing_queue_state = BQS1 }}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, MsgId, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + [] = MsgIds1 -- MsgIds, %% ASSERTION + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {ok, case length(AckTags) =:= length(MsgIds) of + true -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }; + false -> + %% The only thing we can safely do is nuke out our BQ + %% and MA. The interaction between this and confirms + %% doesn't really bear thinking about... + {_Count, BQS1} = BQ:purge(BQS), + {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + State #state { msg_id_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction({sender_death, ChPid}, + State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + {ok, case dict:find(ChPid, KS) of + error -> + State; + {ok, MRef} -> + true = erlang:demonitor(MRef), + MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = dict:erase(ChPid, KS) } + end}; +process_instruction({delete_and_terminate, Reason}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(Reason, BQS), + {stop, State #state { backing_queue_state = undefined }}. + +msg_ids_to_acktags(MsgIds, MA) -> + {AckTags, MA1} = + lists:foldl( + fun (MsgId, {Acc, MAN}) -> + case dict:find(MsgId, MA) of + error -> {Acc, MAN}; + {ok, {_Num, AckTag}} -> {[AckTag | Acc], + dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), + {lists:reverse(AckTags), MA1}. + +ack_all(BQ, MA, BQS) -> + BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). + +maybe_store_ack(false, _MsgId, _AckTag, State) -> + State; +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, + ack_num = Num }) -> + State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), + ack_num = Num + 1 }. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl new file mode 100644 index 00000000..2ce5941e --- /dev/null +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -0,0 +1,60 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave_sup). + +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, recovery}, + {enables, routing_ready}]}). + +-rabbit_boot_step({mirrored_queues, + [{description, "adding mirrors to queues"}, + {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, + {requires, mirror_queue_slave_sup}, + {enables, routing_ready}]}). + +-behaviour(supervisor2). + +-export([start/0, start_link/0, start_child/2]). + +-export([init/1]). + +-include_lib("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +start() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_mirror_queue_slave_sup, + {rabbit_mirror_queue_slave_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), + ok. + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 10, 10}, + [{rabbit_mirror_queue_slave, + {rabbit_mirror_queue_slave, start_link, []}, + temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 568b9ce6..8d5c8646 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -241,7 +241,8 @@ table_definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 8f166672..26780676 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -110,8 +110,10 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, slave_pids = SPids}] -> + SPids ++ [QPid | QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3f4aa54e..3ee71a6d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1072,15 +1072,25 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), + {error, {no_such_user, _}} = + control_action(set_user_tags, ["foo", "bar"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), ok = control_action(change_password, ["foo", "baz"]), - ok = control_action(set_admin, ["foo"]), - ok = control_action(clear_admin, ["foo"]), - ok = control_action(list_users, []), + + TestTags = fun (Tags) -> + Args = ["foo" | [atom_to_list(T) || T <- Tags]], + ok = control_action(set_user_tags, Args), + {ok, #internal_user{tags = Tags}} = + rabbit_auth_backend_internal:lookup_user(<<"foo">>), + ok = control_action(list_users, []) + end, + TestTags([foo, bar, baz]), + TestTags([administrator]), + TestTags([]), %% vhost creation ok = control_action(add_vhost, ["/testhost"]), @@ -1203,10 +1213,10 @@ test_spawn() -> user(Username) -> #user{username = Username, - is_admin = true, + tags = [administrator], auth_backend = rabbit_auth_backend_internal, impl = #internal_user{username = Username, - is_admin = true}}. + tags = [administrator]}}. test_statistics_event_receiver(Pid) -> receive @@ -2077,8 +2087,11 @@ variable_queue_init(Q, Recover) -> Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> + variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> lists:foldl( - fun (_N, VQN) -> + fun (N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), @@ -2086,7 +2099,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, self(), VQN) + PropFun(N, #message_properties{}), self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2126,6 +2139,7 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, + fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1]], passed. @@ -2162,14 +2176,9 @@ test_dropwhile(VQ0) -> Count = 10, %% add messages with sequential expiry - VQ1 = lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, self(), VQN) - end, VQ0, lists:seq(1, Count)), + VQ1 = variable_queue_publish( + false, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( @@ -2189,6 +2198,14 @@ test_dropwhile(VQ0) -> VQ4. +test_dropwhile_varying_ram_duration(VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), + VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ5 = variable_queue_publish(false, 1, VQ4), + rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1f0f8bbe..03b2c9e8 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -124,7 +124,9 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + slave_pids :: [pid()], + mirror_nodes :: [node()] | 'undefined' | 'all'}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), @@ -139,14 +141,14 @@ -type(user() :: #user{username :: username(), - is_admin :: boolean(), + tags :: [atom()], auth_backend :: atom(), impl :: any()}). -type(internal_user() :: #internal_user{username :: username(), password_hash :: password_hash(), - is_admin :: boolean()}). + tags :: [atom()]}). -type(username() :: binary()). -type(password() :: binary()). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 5e4a1224..0f7a7810 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -28,7 +28,10 @@ -rabbit_upgrade({topic_trie, mnesia, []}). -rabbit_upgrade({semi_durable_route, mnesia, []}). -rabbit_upgrade({exchange_event_serial, mnesia, []}). --rabbit_upgrade({trace_exchanges, mnesia, []}). +-rabbit_upgrade({trace_exchanges, mnesia, [internal_exchanges]}). +-rabbit_upgrade({user_admin_to_tags, mnesia, [user_to_internal_user]}). +-rabbit_upgrade({ha_mirrors, mnesia, []}). +-rabbit_upgrade({gm, mnesia, []}). %% ------------------------------------------------------------------- @@ -43,6 +46,9 @@ -spec(semi_durable_route/0 :: () -> 'ok'). -spec(exchange_event_serial/0 :: () -> 'ok'). -spec(trace_exchanges/0 :: () -> 'ok'). +-spec(user_admin_to_tags/0 :: () -> 'ok'). +-spec(ha_mirrors/0 :: () -> 'ok'). +-spec(gm/0 :: () -> 'ok'). -endif. @@ -121,6 +127,34 @@ trace_exchanges() -> VHost <- rabbit_vhost:list()], ok. +user_admin_to_tags() -> + transform( + rabbit_user, + fun({internal_user, Username, PasswordHash, true}) -> + {internal_user, Username, PasswordHash, [administrator]}; + ({internal_user, Username, PasswordHash, false}) -> + {internal_user, Username, PasswordHash, [management]} + end, + [username, password_hash, tags], internal_user). + +ha_mirrors() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddMirrorPidsFun = + fun ({amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid}) -> + {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, + [], undefined} + end, + [ ok = transform(T, + AddMirrorPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, mirror_nodes]) + || T <- Tables ], + ok. + +gm() -> + create(gm_group, [{record_name, gm_group}, + {attributes, [name, version, members]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a167cca0..c6d99deb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,8 +18,9 @@ -export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + dropwhile/2, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/3, discard/3, @@ -560,114 +561,29 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), - a(State1). - -dropwhile1(Pred, State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, State2); - false -> {ok, in_r(MsgStatus, State1)} - end - end, State). - -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - true = queue:is_empty(Q4), %% ASSERTION - State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + case queue_out(State) of + {empty, State1} -> + a(State1); + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> a(in_r(MsgStatus, State1)) + end + end. fetch(AckRequired, State) -> - internal_queue_out( - fun(MsgStatus, State1) -> - %% it's possible that the message wasn't read from disk - %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) - end, State). - -internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) - end; - {{value, MsgStatus}, Q4a} -> - Fun(MsgStatus, State #vqstate { q4 = Q4a }) + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + %% it is possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), + {Res, a(State3)} end. -read_msg(MsgStatus = #msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> - {MsgStatus, State}. - -internal_fetch(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount }) -> - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) - end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - - {{Msg, IsDelivered, AckTag, Len1}, - a(State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1 })}. - ack(AckTags, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (_, State0) -> State0 end, @@ -1141,6 +1057,95 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. +in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> + case queue:is_empty(Q4) of + true -> State #vqstate { + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + end; +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + +queue_out(State = #vqstate { q4 = Q4 }) -> + case queue:out(Q4) of + {empty, _Q4} -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} + end; + {{value, MsgStatus}, Q4a} -> + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end. + +read_msg(MsgStatus = #msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 }}. + msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> case SyncCallback(?MODULE, |