From 2bdada687c2d8ed9ad80ee959f8977311a031843 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 11 Jul 2008 03:25:48 -0500 Subject: Added durability flag to realm record, unit test for realm preening --- include/rabbit.hrl | 2 +- src/rabbit_amqqueue.erl | 2 +- src/rabbit_exchange.erl | 2 +- src/rabbit_realm.erl | 26 +++++++++++--------------- src/rabbit_tests.erl | 23 ++++++++++++++++++++++- 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5a3006dd..c7415f45 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,7 +30,7 @@ -record(vhost_realm, {virtual_host, realm}). -record(realm, {name,ignore}). --record(realm_resource, {realm, resource}). +-record(realm_resource, {realm, resource, durable}). -record(user_realm, {username, realm, ticket_pattern}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 63f043ba..59c41fb8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -142,7 +142,7 @@ declare(RealmName, NameBin, Durable, AutoDelete, Args) -> fun () -> case mnesia:wread({amqqueue, QName}) of [] -> ok = recover_queue(Q), - ok = rabbit_realm:add(RealmName, QName), + ok = rabbit_realm:add(RealmName, QName, Durable), Q; [ExistingQ] -> ExistingQ end diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 113b7878..32418ca2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -106,7 +106,7 @@ declare(RealmName, NameBin, Type, Durable, AutoDelete, Args) -> durable_exchanges, Exchange, write); true -> ok end, - ok = rabbit_realm:add(RealmName, XName), + ok = rabbit_realm:add(RealmName, XName, Durable), Exchange; [ExistingX] -> ExistingX end diff --git a/src/rabbit_realm.erl b/src/rabbit_realm.erl index 2ededb5f..2887158b 100644 --- a/src/rabbit_realm.erl +++ b/src/rabbit_realm.erl @@ -27,7 +27,7 @@ -export([recover/0]). -export([add_realm/1, delete_realm/1, list_vhost_realms/1]). --export([add/2, delete/2, check/2, delete_from_all/1]). +-export([add/3, check/2, delete_from_all/1]). -export([access_request/3, enter_realm/3, leave_realms/1]). -export([on_node_down/1]). @@ -44,8 +44,8 @@ -spec(add_realm/1 :: (realm_name()) -> 'ok'). -spec(delete_realm/1 :: (realm_name()) -> 'ok'). -spec(list_vhost_realms/1 :: (vhost()) -> [name()]). --spec(add/2 :: (realm_name(), r(e_or_q())) -> 'ok'). --spec(delete/2 :: (realm_name(), r(e_or_q())) -> 'ok'). +% -spec(add/3 :: (realm_name(), r(e_or_q())) -> 'ok'). +% -spec(delete/3 :: (realm_name(), r(e_or_q())) -> 'ok'). -spec(check/2 :: (realm_name(), r(e_or_q())) -> bool() | not_found()). -spec(delete_from_all/1 :: (r(e_or_q())) -> 'ok'). -spec(access_request/3 :: (username(), bool(), ticket()) -> @@ -109,21 +109,18 @@ list_vhost_realms(VHostPath) -> VHostPath, fun () -> mnesia:read({vhost_realm, VHostPath}) end))]. -add(Realm = #resource{kind = realm}, Resource = #resource{}) -> - manage_link(fun mnesia:write/1, Realm, Resource). - -delete(Realm = #resource{kind = realm}, Resource = #resource{}) -> - manage_link(fun mnesia:delete_object/1, Realm, Resource). +add(Realm = #resource{kind = realm}, Resource = #resource{}, Durable) -> + manage_link(fun mnesia:write/1, Realm, Resource, Durable). % This links or unlinks a resource to a realm manage_link(Action, Realm = #resource{kind = realm, name = RealmName}, - Resource = #resource{name = ResourceName}) -> + Resource = #resource{name = ResourceName}, Durable) -> Table = realm_table_for_resource(Resource), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({realm, Realm}) of [] -> mnesia:abort(not_found); - [_] -> Action({Table, RealmName, ResourceName}) + [_] -> Action({Table, RealmName, ResourceName, Durable}) end end). @@ -132,7 +129,6 @@ realm_table_for_resource(#resource{kind = queue}) -> realm_queue. parent_table_for_resource(#resource{kind = exchange}) -> exchange; parent_table_for_resource(#resource{kind = queue}) -> amqqueue. - check(#resource{kind = realm, name = Realm}, Resource = #resource{}) -> F = mnesia:match_object(#realm_resource{resource = Resource#resource.name, realm = Realm}), case mnesia:async_dirty(F) of @@ -236,14 +232,14 @@ preen_realm(Resource = #resource{}) -> LinkType = realm_table_for_resource(Resource), Q = qlc:q([L#realm_resource.resource || L <- mnesia:table(LinkType)]), Cursor = qlc:cursor(Q), - preen_next(Cursor,LinkType,parent_table_for_resource(Resource)), + preen_next(Cursor, LinkType, parent_table_for_resource(Resource)), qlc:delete_cursor(Cursor). - -preen_next(Cursor,LinkType,ParentTable) -> + +preen_next(Cursor, LinkType, ParentTable) -> case qlc:next_answers(Cursor,1) of [] -> ok; [ResourceKey] -> - case mnesia:read({ParentTable,ResourceKey}) of + case mnesia:match_object({ParentTable,ResourceKey,'_'}) of [] -> mnesia:delete_object({LinkType,'_',ResourceKey}); _ -> ok diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index beeb3508..5c265633 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -25,7 +25,9 @@ -module(rabbit_tests). --export([all_tests/0, test_parsing/0]). +-include("rabbit.hrl"). + +-export([all_tests/0, test_parsing/0,preening_test/0]). -import(lists). @@ -46,6 +48,25 @@ all_tests() -> test_parsing() -> passed = test_content_properties(), passed. + +preening_test() -> + Realm = #resource{virtual_host = <<"/">>,kind = realm, name = <<"/data">>}, + loop(Realm,1), + rabbit_realm:recover(). + +loop(_,0) -> ok; +loop(Realm,N) -> + declare(Realm,true), + declare(Realm,false), + loop(Realm,N-1). + +declare(Realm,Durable) -> + X = rabbit_misc:binstring_guid("x"), + Q = rabbit_misc:binstring_guid("amq.gen"), + AutoDelete = false, + rabbit_exchange:declare(Realm,X, <<"direct">>, Durable, AutoDelete, undefined), + rabbit_amqqueue:declare(Realm, Q, Durable, AutoDelete, undefined). + test_content_properties() -> test_content_prop_roundtrip([], <<0, 0>>), -- cgit v1.2.1 -- cgit v1.2.1 From f7ec115cd86aee9ff2a8ab079d39612fe6054022 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 3 Oct 2008 13:31:50 +0100 Subject: propagate channel/connection errors when in closing state --- src/rabbit_reader.erl | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bfd1ea72..7e68b3ed 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -94,10 +94,18 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit -> -%% if abnormal exit then log error -%% if last channel to exit then send connection.close_ok, start -%% terminate_connection timer, *closing* +%% channel exit with hard error +%% -> log error, wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *closed* +%% channel exit with soft error +%% -> log error, start terminate_channel timer, mark channel as +%% closing +%% if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* +%% else *closing* +%% channel exits normally +%% -> if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -291,24 +299,13 @@ terminate_channel(Channel, Ref, State) -> end, State. -handle_dependent_exit(Pid, Reason, - State = #v1{connection_state = closing}) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> - case Reason of - normal -> ok; - _ -> log_channel_error(closing, Channel, Reason) - end, - maybe_close(State) - end; handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - State; + maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> handle_exception(State, Channel, Reason) + Channel -> maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(Pid) -> @@ -365,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State) -> +maybe_close(State = #v1{connection_state = closing}) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end. + end; +maybe_close(State) -> + State. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> -- cgit v1.2.1 From 811734754daf63aefeb3d5a0447c39c0c4b10273 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 3 Oct 2008 13:38:01 +0100 Subject: backout changeset committed on wrong branch --- src/rabbit_reader.erl | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7e68b3ed..bfd1ea72 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -94,18 +94,10 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* +%% channel exit -> +%% if abnormal exit then log error +%% if last channel to exit then send connection.close_ok, start +%% terminate_connection timer, *closing* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -299,13 +291,24 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_dependent_exit(Pid, Reason, + State = #v1{connection_state = closing}) -> + case channel_cleanup(Pid) of + undefined -> exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + case Reason of + normal -> ok; + _ -> log_channel_error(closing, Channel, Reason) + end, + maybe_close(State) + end; handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - maybe_close(State); + State; handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) + Channel -> handle_exception(State, Channel, Reason) end. channel_cleanup(Pid) -> @@ -362,15 +365,13 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State = #v1{connection_state = closing}) -> +maybe_close(State) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end; -maybe_close(State) -> - State. + end. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> -- cgit v1.2.1 From 6c1976f5b38781bb8a8c297cc438dcee96035910 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sat, 4 Oct 2008 18:11:24 +0100 Subject: wiring for system_memory_high_watermark alarms Queue processes are initialised with, and are alerted to transitions in, the system_memory_high_watermark alarm status. --- src/rabbit_alarm.erl | 24 +++++++++++++++++++++--- src/rabbit_amqqueue.erl | 18 +++++++++++++++++- src/rabbit_amqqueue_process.erl | 11 +++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index e71dda59..f38651d1 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -27,19 +27,22 @@ -behaviour(gen_event). --export([start/0, stop/0]). +-export([start/0, stop/0, maybe_conserve_memory/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -define(MEMSUP_CHECK_INTERVAL, 1000). +-record(alarms, {system_memory_high_watermark = false}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). +-spec(maybe_conserve_memory/1 :: (pid()) -> 'ok'). -endif. @@ -61,15 +64,30 @@ start() -> stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). +maybe_conserve_memory(QPid) -> + gen_event:call(alarm_handler, ?MODULE, {maybe_conserve_memory, QPid}). + %%---------------------------------------------------------------------------- init([]) -> - {ok, none}. + {ok, #alarms{}}. + +handle_call({maybe_conserve_memory, QPid}, + State = #alarms{system_memory_high_watermark = Conserve}) -> + {ok, rabbit_amqqueue:conserve_memory(QPid, Conserve), State}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event(Event, State) -> +handle_event({set_alarm,{system_memory_high_watermark,[]}}, State) -> + rabbit_amqqueue:conserve_memory(true), + {ok, State#alarms{system_memory_high_watermark = true}}; + +handle_event({clear_alarm,{system_memory_high_watermark,[]}}, State) -> + rabbit_amqqueue:conserve_memory(false), + {ok, State#alarms{system_memory_high_watermark = false}}; + +handle_event(_Event, State) -> {ok, State}. handle_info(_Info, State) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7ce350d8..0dc6931d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -module(rabbit_amqqueue). -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). --export([pseudo_queue/2]). +-export([conserve_memory/1, conserve_memory/2, pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, commit/2, rollback/2]). @@ -55,6 +55,8 @@ {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(conserve_memory/1 :: (bool()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). -spec(add_binding/4 :: @@ -130,6 +132,19 @@ recover_durable_queues() -> ok end). +conserve_memory(Conserve) -> + [ok = gen_server:cast(QPid, {conserve_memory, Conserve}) || + {_, QPid, worker, _} <- + supervisor:which_children(rabbit_amqqueue_sup)], + ok. + +conserve_memory(QPid, Conserve) -> + %% This needs to be synchronous. It is called during queue + %% creation and we need to make sure that the memory conservation + %% status of the queue has been set before it becomes reachable in + %% message routing. + gen_server:call(QPid, {conserve_memory, Conserve}). + declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, @@ -160,6 +175,7 @@ store_queue(Q = #amqqueue{durable = false}) -> start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + ok = rabbit_alarm:maybe_conserve_memory(Pid), Q#amqqueue{pid = Pid}. recover_queue(Q) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef16..b508ecf8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -90,6 +90,11 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +conserve_memory(Conserve, State) -> + %% TODO + rabbit_log:info("~p conserving memory: ~p~n", [self(), Conserve]), + State. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -455,6 +460,9 @@ purge_message_buffer(QName, MessageBuffer) -> %--------------------------------------------------------------------------- +handle_call({conserve_memory, Conserve}, _From, State) -> + {reply, ok, conserve_memory(Conserve, State)}; + handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -614,6 +622,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, {reply, locked, State} end. +handle_cast({conserve_memory, Conserve}, State) -> + {noreply, conserve_memory(Conserve, State)}; + handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), -- cgit v1.2.1 From 6bc2454ddc1c7b576b950e62cd0876738f157b38 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 5 Oct 2008 11:44:50 +0100 Subject: document limitation of initial effectiveness of memsup checks --- src/rabbit_alarm.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index f38651d1..d5cbf066 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -56,6 +56,11 @@ start() -> %% a granularity of minutes. So we have to peel off one layer of %% the API to get to the underlying layer which operates at the %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, infinity), -- cgit v1.2.1 From d8e0b34050e3bb17d127eba8c344162dd1abca46 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 5 Oct 2008 13:03:05 +0100 Subject: make queues drop new messages whene memory is low - deliver_immediately needed refactoring so that in the memory-conserving case it looks for an auto-ack consumer (to which the message can be delivered without needing to keep a record of it), rather than just picking the first available consumer - We don't want to drop messages that have already been enqueued. Therefore the call to deliver_immediately in run_poke_burst always pretends that memory conservation mode is inactive. run_poke_burst is the (only) place that handles delivery of already enqueued messages, e.g. when redelivering unacked messages after channel closure, adding a new consumer and recovering messages from the persister after a restart. - a warning with the count of dropped messages (if >0) is logged when the queue resumes normal operation or is terminated TODO: drop transactional messages and mark the tx as failed --- src/rabbit_amqqueue_process.erl | 131 +++++++++++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 36 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b508ecf8..2669ff27 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -44,6 +44,8 @@ owner, exclusive_consumer, has_had_consumers, + conserve_memory, + dropped_message_count, next_msg_id, message_buffer, round_robin}). @@ -73,13 +75,16 @@ init(Q) -> owner = none, exclusive_consumer = none, has_had_consumers = false, + conserve_memory = false, + dropped_message_count = 0, next_msg_id = 1, message_buffer = queue:new(), round_robin = queue:new()}}. -terminate(_Reason, State) -> +terminate(_Reason, State = #q{dropped_message_count = C}) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), + log_dropped_message_count(QName, C), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, all_tx()), ok = purge_message_buffer(QName, State#q.message_buffer), @@ -90,10 +95,20 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +log_dropped_message_count(_QName, 0) -> + ok; +log_dropped_message_count(QName, C) -> + rabbit_log:warning("~s dropped ~p messages to conserve memory~n", + [rabbit_misc:rs(QName), C]), + ok. + +conserve_memory(false, State = #q{q = #amqqueue{name = QName}, + conserve_memory = true, + dropped_message_count = C}) -> + log_dropped_message_count(QName, C), + State#q{conserve_memory = false, dropped_message_count = 0}; conserve_memory(Conserve, State) -> - %% TODO - rabbit_log:info("~p conserving memory: ~p~n", [self(), Conserve]), - State. + State#q{conserve_memory = Conserve}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -138,47 +153,89 @@ update_store_and_maybe_block_ch( store_ch_record(C#cr{is_overload_protection_active = NewActive}), Result. -deliver_immediately(Message, Delivered, +increment_dropped_message_count(State) -> + State#q{dropped_message_count = State#q.dropped_message_count + 1}. + +find_auto_ack_consumer(RoundRobin, RoundRobinNew) -> + case queue:out(RoundRobin) of + {{value, QEntry = {_, #consumer{ack_required = AckRequired}}}, + RoundRobinTail} -> + case AckRequired of + true -> find_auto_ack_consumer( + RoundRobinTail, + queue:in(QEntry, RoundRobinNew)); + false -> {QEntry, queue:join(RoundRobinNew, RoundRobinTail)} + end; + {empty, _} -> false + end. + +deliver_to_consumer(Message, Delivered, QName, MsgId, + QEntry = {ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, + RoundRobinTail) -> + ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), MsgId, Delivered, Message}), + C = #cr{unsent_message_count = Count, unacked_messages = UAM} = + ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(MsgId, Message, UAM); + false -> UAM + end, + NewConsumers = case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {AckRequired, NewConsumers}. + +deliver_immediately(Message, Delivered, true, + State = #q{q = #amqqueue{name = QName}, + round_robin = RoundRobin, + next_msg_id = NextId}) -> + case queue:is_empty(RoundRobin) of + true -> {not_offered, State}; + false -> case find_auto_ack_consumer(RoundRobin, queue:new()) of + false -> + {not_offered, + increment_dropped_message_count(State)}; + {QEntry, RoundRobinTail} -> + {AckRequired, NewRoundRobin} = + deliver_to_consumer( + Message, Delivered, QName, NextId, + QEntry, RoundRobinTail), + {offered, AckRequired, + State#q{round_robin = NewRoundRobin, + next_msg_id = NextId + 1}} + end + end; +deliver_immediately(Message, Delivered, false, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, - RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + {{value, QEntry}, RoundRobinTail} -> + {AckRequired, NewRoundRobin} = + deliver_to_consumer(Message, Delivered, QName, NextId, + QEntry, RoundRobinTail), + {offered, AckRequired, State#q{round_robin = NewRoundRobin, + next_msg_id = NextId + 1}}; {empty, _} -> - not_offered + {not_offered, State} end. -attempt_delivery(none, Message, State) -> - case deliver_immediately(Message, false, State) of +attempt_delivery(none, Message, State = #q{conserve_memory = Conserve}) -> + case deliver_immediately(Message, false, Conserve, State) of {offered, false, State1} -> {true, State1}; {offered, true, State1} -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -189,7 +246,9 @@ deliver_or_enqueue(Txn, Message, State) -> case attempt_delivery(Txn, Message, State) of {true, NewState} -> {true, NewState}; - {false, NewState} -> + {false, NewState = #q{conserve_memory = true}} -> + {false, NewState}; + {false, NewState = #q{conserve_memory = false}} -> persist_message(Txn, qname(State), Message), NewMB = queue:in({Message, false}, NewState#q.message_buffer), {false, NewState#q{message_buffer = NewMB}} @@ -306,15 +365,15 @@ run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(MessageBuffer, State) -> case queue:out(MessageBuffer) of {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, State) of + case deliver_immediately(Message, Delivered, false, State) of {offered, true, NewState} -> persist_delivery(qname(State), Message, Delivered), run_poke_burst(BufferTail, NewState); {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} -- cgit v1.2.1 From 992cb011b2b07a44916d5f14678c4f959d4cfb0c Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 5 Oct 2008 14:35:46 +0100 Subject: drop transactional messages when conserving memory and make sure the involved transactions fail when committing --- src/rabbit_amqqueue.erl | 4 ++-- src/rabbit_amqqueue_process.erl | 34 +++++++++++++++++++++++++++------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0dc6931d..9156924e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -83,8 +83,8 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). +-spec(commit/2 :: (pid(), txn()) -> 'ok' | {'error', any()}). +-spec(rollback/2 :: (pid(), txn()) -> 'ok' | {'error', any()}). -spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). -spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2669ff27..99560850 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,7 +52,8 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {ch_pid, is_persistent, fail_reason, + pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumers, @@ -237,10 +238,13 @@ attempt_delivery(none, Message, State = #q{conserve_memory = Conserve}) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, Message, State) -> +attempt_delivery(Txn, Message, State = #q{conserve_memory = false}) -> persist_message(Txn, qname(State), Message), record_pending_message(Txn, Message), - {true, State}. + {true, State}; +attempt_delivery(Txn, _Message, State = #q{conserve_memory = true}) -> + mark_tx_failed(Txn, dropped_messages_to_conserve_memory), + {false, increment_dropped_message_count(State)}. deliver_or_enqueue(Txn, Message, State) -> case attempt_delivery(Txn, Message, State) of @@ -455,6 +459,7 @@ lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, is_persistent = false, + fail_reason = none, pending_messages = [], pending_acks = []}; V -> V @@ -477,6 +482,14 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. +mark_tx_failed(Txn, Reason) -> + Tx = lookup_tx(Txn), + store_tx(Txn, Tx#tx{fail_reason = Reason}). + +tx_fail_reason(Txn) -> + #tx{fail_reason = Res} = lookup_tx(Txn), + Res. + record_pending_message(Txn, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). @@ -545,10 +558,17 @@ handle_call({deliver, Txn, Message}, _From, State) -> {reply, Delivered, NewState}; handle_call({commit, Txn}, From, State) -> - ok = commit_work(Txn, qname(State)), - %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), - NewState = process_pending(Txn, State), + NewState = + case tx_fail_reason(Txn) of + none -> ok = commit_work(Txn, qname(State)), + %% optimisation: we reply straight away so the + %% sender can continue + gen_server:reply(From, ok), + process_pending(Txn, State); + Reason -> ok = rollback_work(Txn, qname(State)), + gen_server:reply(From, {error, Reason}), + State + end, erase_tx(Txn), {noreply, NewState}; -- cgit v1.2.1 From 13108a5105e91523f0440a0a9a946a2f6ac28fa3 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 5 Oct 2008 15:09:49 +0100 Subject: oops --- src/rabbit_alarm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d5cbf066..629654e4 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -88,7 +88,7 @@ handle_event({set_alarm,{system_memory_high_watermark,[]}}, State) -> rabbit_amqqueue:conserve_memory(true), {ok, State#alarms{system_memory_high_watermark = true}}; -handle_event({clear_alarm,{system_memory_high_watermark,[]}}, State) -> +handle_event({clear_alarm,{system_memory_high_watermark}}, State) -> rabbit_amqqueue:conserve_memory(false), {ok, State#alarms{system_memory_high_watermark = false}}; -- cgit v1.2.1 From b51560f9342d4ba68e3d9ac811cb7b2998c045e4 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 5 Oct 2008 15:20:03 +0100 Subject: oops again This time I've actually tested this properly, so it's definitely correct. --- src/rabbit_alarm.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 629654e4..02a783a1 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -84,11 +84,11 @@ handle_call({maybe_conserve_memory, QPid}, handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm,{system_memory_high_watermark,[]}}, State) -> +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> rabbit_amqqueue:conserve_memory(true), {ok, State#alarms{system_memory_high_watermark = true}}; -handle_event({clear_alarm,{system_memory_high_watermark}}, State) -> +handle_event({clear_alarm, system_memory_high_watermark}, State) -> rabbit_amqqueue:conserve_memory(false), {ok, State#alarms{system_memory_high_watermark = false}}; -- cgit v1.2.1 From ef58ee48db20068136f846174e17b599141d69da Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 14 Nov 2008 23:39:08 +0000 Subject: work around OTP-7025 also simplify the match slightly --- src/rabbit_exchange.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a8c54438..58b8d7d6 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -268,9 +268,17 @@ exchanges_for_queue(QueueName) -> has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, - queue_name = '$1', _ = '_'}}, - continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)). + try + continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + case mnesia:match_object(MatchHead) of + [] -> false; + [_|_] -> true + end + end. continue('$end_of_table') -> false; continue({[_|_], _}) -> true; -- cgit v1.2.1 From cc86260fcf9dbc5432df6365113f73197120f4d4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 25 Nov 2008 13:07:59 +0000 Subject: First pass at a simple memory supervisor for Linux. --- src/rabbit.erl | 2 +- src/rabbit_alarm.erl | 44 ++++++++++++------ src/rabbit_linux_memory.erl | 108 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 16 deletions(-) create mode 100644 src/rabbit_linux_memory.erl diff --git a/src/rabbit.erl b/src/rabbit.erl index a33c5b7b..cd2bda97 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -29,7 +29,7 @@ -export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). --export([start/2, stop/1]). +-export([start/2, stop/1, start_child/1]). -export([log_location/1]). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d9c1c450..5ba88a30 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -50,21 +50,35 @@ %%---------------------------------------------------------------------------- start() -> - %% The default memsup check interval is 1 minute, which is way too - %% long - rabbit can gobble up all memory in a matter of - %% seconds. Unfortunately the memory_check_interval configuration - %% parameter and memsup:set_check_interval/1 function only provide - %% a granularity of minutes. So we have to peel off one layer of - %% the API to get to the underlying layer which operates at the - %% granularity of milliseconds. - %% - %% Note that the new setting will only take effect after the first - %% check has completed, i.e. after one minute. So if rabbit eats - %% all the memory within the first minute after startup then we - %% are out of luck. - ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, - infinity), - + case os:type() of + {unix, linux} -> + %% memsup doesn't take account of buffers or cache when considering + %% "free" memory - therefore on Linux we can get memory alarms + %% very easily without any pressure existing on memory at all. + %% Therefore we need to use our own simple memory monitor + + supervisor:terminate_child(os_mon_sup, memsup), + supervisor:delete_child(os_mon_sup, memsup), + rabbit:start_child(rabbit_linux_memory), + + ok; + _ -> + %% The default memsup check interval is 1 minute, which is way too + %% long - rabbit can gobble up all memory in a matter of + %% seconds. Unfortunately the memory_check_interval configuration + %% parameter and memsup:set_check_interval/1 function only provide + %% a granularity of minutes. So we have to peel off one layer of + %% the API to get to the underlying layer which operates at the + %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. + + ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, + infinity) + end, ok = alarm_handler:add_alarm_handler(?MODULE). stop() -> diff --git a/src/rabbit_linux_memory.erl b/src/rabbit_linux_memory.erl new file mode 100644 index 00000000..69708519 --- /dev/null +++ b/src/rabbit_linux_memory.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_linux_memory). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(MEMORY_CHECK_INTERVAL, 1000). +-define(MEMORY_CHECK_FRACTION, 0.95). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + + +init(_Args) -> + {ok, no_alarm, ?MEMORY_CHECK_INTERVAL}. + + +handle_call(_Request, _From, State) -> + {noreply, State, ?MEMORY_CHECK_INTERVAL}. + + +handle_cast(_Request, State) -> + {noreply, State, ?MEMORY_CHECK_INTERVAL}. + + +handle_info(_Info, State) -> + File = read_proc_file("/proc/meminfo"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(split_and_parse_lines(Lines, [])), + MemTotal = dict:fetch("MemTotal", Dict), + MemUsed = MemTotal + - dict:fetch("MemFree", Dict) + - dict:fetch("Buffers", Dict) + - dict:fetch("Cached", Dict), + if + MemUsed / MemTotal > ?MEMORY_CHECK_FRACTION -> + NewState = alarm; + true -> + NewState = no_alarm + end, + case {State, NewState} of + {no_alarm, alarm} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}), + ok; + {alarm, no_alarm} -> + alarm_handler:clear_alarm(system_memory_high_watermark), + ok; + _ -> + ok + end, + {noreply, NewState, ?MEMORY_CHECK_INTERVAL}. + +%% file:read_file does not work on files in /proc as it seems to get the size +%% of the file first and then read that many bytes. But files in /proc always +%% have length 0, we just have to read until we get eof. +read_proc_file(File) -> + {ok, IoDevice} = file:open(File, [read, raw]), + {ok, Res} = file:read(IoDevice, 1000000), + Res. + +%% A line looks like "FooBar: 123456 kB" +split_and_parse_lines([], Acc) -> Acc; +split_and_parse_lines([Line | Rest], Acc) -> + Name = line_element(Line, 1), + ValueString = line_element(Line, 2), + Value = list_to_integer(string:sub_word(ValueString, 1)), + split_and_parse_lines(Rest, [{Name, Value} | Acc]). + +line_element(Line, Count) -> + string:strip(string:sub_word(Line, Count, $:)). + + +terminate(_Reason, _State) -> + ok. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -- cgit v1.2.1 From 6c62255f86be579dff7b7a4767656b2234507ff2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 25 Nov 2008 13:20:47 +0000 Subject: Empty changeset to create a new head for bug18557 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 04a0aff6..6baee930 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ WEB_URL=http://stage.rabbitmq.com/ ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are # only available in R12B-3 upwards -# +# # NB: the test assumes that version number will only contain single digits USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi) endif -- cgit v1.2.1 From 9b463e6d0a8f47783e14ac26b6b0e11f4bb2b756 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 16 Feb 2009 11:35:59 +0000 Subject: Backed out changeset 7b5f544468b6 since it pulled in a load of work from the default branch. --- .hgignore | 1 - Makefile | 21 +- docs/rabbitmqctl.1.pod | 65 ++- ebin/rabbit.app | 57 +++ ebin/rabbit_app.in | 21 - generate_app | 10 - include/rabbit.hrl | 7 - scripts/rabbitmq-multi | 5 - scripts/rabbitmq-server | 5 - src/buffering_proxy.erl | 108 +++++ src/gen_server2.erl | 854 ------------------------------------- src/rabbit.erl | 42 +- src/rabbit_access_control.erl | 189 +++----- src/rabbit_alarm.erl | 8 +- src/rabbit_amqqueue.erl | 88 ++-- src/rabbit_amqqueue_process.erl | 178 +++----- src/rabbit_channel.erl | 310 +++++--------- src/rabbit_control.erl | 93 ++-- src/rabbit_error_logger_file_h.erl | 2 +- src/rabbit_exchange.erl | 231 +++------- src/rabbit_framing_channel.erl | 6 +- src/rabbit_limiter.erl | 195 --------- src/rabbit_misc.erl | 36 +- src/rabbit_mnesia.erl | 75 ++-- src/rabbit_networking.erl | 8 +- src/rabbit_reader.erl | 20 +- src/rabbit_router.erl | 10 +- src/rabbit_sasl_report_file_h.erl | 2 +- src/rabbit_tests.erl | 31 +- 29 files changed, 673 insertions(+), 2005 deletions(-) create mode 100644 ebin/rabbit.app delete mode 100644 ebin/rabbit_app.in delete mode 100644 generate_app create mode 100644 src/buffering_proxy.erl delete mode 100644 src/gen_server2.erl delete mode 100644 src/rabbit_limiter.erl diff --git a/.hgignore b/.hgignore index 35607765..28f9cfd8 100644 --- a/.hgignore +++ b/.hgignore @@ -9,7 +9,6 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ -^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ diff --git a/Makefile b/Makefile index e19c0d56..f924b8e6 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,7 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) +TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -40,15 +39,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app - escript generate_app $(EBIN_DIR) < $< > $@ - -$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam - erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< +# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -59,12 +52,12 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(BEAM_TARGETS) +dialyze: $(TARGETS) dialyzer -c $? clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -129,8 +122,8 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ - >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in + >> $(TARGET_SRC_DIR)/README + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index d86aa271..d2cb0199 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -26,7 +26,7 @@ B<-n> I startup time). The output of hostname -s is usually the correct suffix to use after the "@" sign. See rabbitmq-server(1) for details of configuring the RabbitMQ broker. - + B<-q> quiet output mode is selected with the B<-q> flag. Informational messages are suppressed when quiet mode is in effect. @@ -43,32 +43,32 @@ stop_app This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I. - + start_app start the RabbitMQ application. This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I. - + status display various information about the RabbitMQ broker, such as whether the RabbitMQ application on the current node, its version number, what nodes are part of the broker, which of these are running. - + force return a RabbitMQ node to its virgin state. Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users, vhosts and deletes all persistent messages. - + force_reset the same as I command, but resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted. - + rotate_logs [suffix] instruct the RabbitMQ node to rotate the log files. The RabbitMQ broker will attempt to append the current contents of the log file @@ -81,57 +81,48 @@ rotate_logs [suffix] specified. This command might be helpful when you are e.g. writing your own logrotate script and you do not want to restart the RabbitMQ node. - + cluster I ... instruct the node to become member of a cluster with the specified nodes determined by I option(s). See http://www.rabbitmq.com/clustering.html for more information about clustering. - + =head2 USER MANAGEMENT - + add_user I I create a user named I with (initial) password I. - -delete_user I - delete the user named I. - + change_password I I change the password for the user named I to I. list_users list all users. - + =head2 ACCESS CONTROL add_vhost I create a new virtual host called I. - + delete_vhost I delete a virtual host I. That command deletes also all its exchanges, queues and user mappings. - + list_vhosts list all virtual hosts. - -set_permissions [-p I] I I I I - set the permissions for the user named I in the virtual - host I, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I, respectively. - -clear_permissions [-p I] I - remove the permissions for the user named I in the - virtual host I. - -list_permissions [-p I] - list all the users and their permissions in the virtual host + +map_user_vhost I I + grant the user named I access to the virtual host called + I. + +unmap_user_vhost I I + deny the user named I access to the virtual host called I. -list_user_permissions I - list the permissions of the user named I across all - virtual hosts. - +list_user_vhost I + list all the virtual hosts to which the user named I has + been granted access. + =head2 SERVER STATUS list_queues [-p I] [I ...] @@ -238,7 +229,7 @@ peer_address peer_port peer port - + state connection state (B, B, B, B, B, B, B) @@ -272,7 +263,7 @@ send_cnt send_pend send queue size - + =back The list_queues, list_exchanges and list_bindings commands accept an @@ -286,12 +277,12 @@ Create a user named foo with (initial) password bar at the Erlang node rabbit@test: rabbitmqctl -n rabbit@test add_user foo bar - + Grant user named foo access to the virtual host called test at the default Erlang node: rabbitmqctl map_user_vhost foo test - + Append the current logs' content to the files with ".1" suffix and reopen them: diff --git a/ebin/rabbit.app b/ebin/rabbit.app new file mode 100644 index 00000000..0d714fdf --- /dev/null +++ b/ebin/rabbit.app @@ -0,0 +1,57 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, [buffering_proxy, + rabbit_access_control, + rabbit_alarm, + rabbit_amqqueue, + rabbit_amqqueue_process, + rabbit_amqqueue_sup, + rabbit_binary_generator, + rabbit_binary_parser, + rabbit_channel, + rabbit_control, + rabbit, + rabbit_error_logger, + rabbit_error_logger_file_h, + rabbit_exchange, + rabbit_framing_channel, + rabbit_framing, + rabbit_heartbeat, + rabbit_load, + rabbit_log, + rabbit_memsup_linux, + rabbit_misc, + rabbit_mnesia, + rabbit_multi, + rabbit_networking, + rabbit_node_monitor, + rabbit_persister, + rabbit_reader, + rabbit_router, + rabbit_sasl_report_file_h, + rabbit_sup, + rabbit_tests, + rabbit_tracer, + rabbit_writer, + tcp_acceptor, + tcp_acceptor_sup, + tcp_client_sup, + tcp_listener, + tcp_listener_sup]}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in deleted file mode 100644 index 5be07492..00000000 --- a/ebin/rabbit_app.in +++ /dev/null @@ -1,21 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, []}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app deleted file mode 100644 index 62301292..00000000 --- a/generate_app +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- - -main([BeamDir]) -> - Modules = [list_to_atom(filename:basename(F, ".beam")) || - F <- filelib:wildcard("*.beam", BeamDir)], - {ok, {application, Application, Properties}} = io:read(''), - NewProperties = lists:keyreplace(modules, 1, Properties, - {modules, Modules}), - io:format("~p.", [{application, Application, NewProperties}]). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c707112f..d07aeaf8 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,9 +30,7 @@ %% -record(user, {username, password}). --record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). --record(user_permission, {user_vhost, permission}). -record(vhost, {virtual_host, dummy}). @@ -76,7 +74,6 @@ -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). --type(regexp() :: binary()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -91,10 +88,6 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(permission() :: - #permission{configure :: regexp(), - write :: regexp(), - read :: regexp()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 164c5e18..84985e90 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -54,11 +54,6 @@ export \ RABBITMQ_SCRIPT_HOME \ RABBITMQ_PIDS_FILE -# we need to turn off path expansion because some of the vars, notably -# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and -# there is no other way of preventing their expansion. -set -f - exec erl \ -pa "`dirname $0`/../ebin" \ -noinput \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 9a35c477..572262c9 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -73,11 +73,6 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' -# we need to turn off path expansion because some of the vars, notably -# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and -# there is no other way of preventing their expansion. -set -f - exec erl \ -pa "`dirname $0`/../ebin" \ ${RABBITMQ_START_RABBIT} \ diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl new file mode 100644 index 00000000..344b719a --- /dev/null +++ b/src/buffering_proxy.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(buffering_proxy). + +-export([start_link/2]). + +%% internal + +-export([mainloop/4, drain/2]). +-export([proxy_loop/3]). + +-define(HIBERNATE_AFTER, 5000). + +%%---------------------------------------------------------------------------- + +start_link(M, A) -> + spawn_link( + fun () -> process_flag(trap_exit, true), + ProxyPid = self(), + Ref = make_ref(), + Pid = spawn_link( + fun () -> ProxyPid ! Ref, + mainloop(ProxyPid, Ref, M, + M:init(ProxyPid, A)) end), + proxy_loop(Ref, Pid, empty) + end). + +%%---------------------------------------------------------------------------- + +mainloop(ProxyPid, Ref, M, State) -> + NewState = + receive + {Ref, Messages} -> + NewSt = + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)), + ProxyPid ! Ref, + NewSt; + Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) + end, + ?MODULE:mainloop(ProxyPid, Ref, M, NewState). + +drain(M, State) -> + receive + Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) + after 0 -> + State + end. + +proxy_loop(Ref, Pid, State) -> + receive + Ref -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> waiting; + waiting -> exit(duplicate_next); + Messages -> Pid ! {Ref, Messages}, empty + end); + {'EXIT', Pid, Reason} -> + exit(Reason); + {'EXIT', _, Reason} -> + exit(Pid, Reason), + ?MODULE:proxy_loop(Ref, Pid, State); + Msg -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> [Msg]; + waiting -> Pid ! {Ref, [Msg]}, empty; + Messages -> [Msg | Messages] + end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) + end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl deleted file mode 100644 index 11bb66d7..00000000 --- a/src/gen_server2.erl +++ /dev/null @@ -1,854 +0,0 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP -%% distribution, with the following modifications: -%% -%% 1) the module name is gen_server2 -%% -%% 2) more efficient handling of selective receives in callbacks -%% gen_server2 processes drain their message queue into an internal -%% buffer before invoking any callback module functions. Messages are -%% dequeued from the buffer for processing. Thus the effective message -%% queue of a gen_server2 process is the concatenation of the internal -%% buffer and the real message queue. -%% As a result of the draining, any selective receive invoked inside a -%% callback is less likely to have to scan a large message queue. -%% -%% 3) gen_server2:cast is guaranteed to be order-preserving -%% The original code could reorder messages when communicating with a -%% process on a remote node that was not currently connected. -%% -%% All modifications are (C) 2009 LShift Ltd. - -%% ``The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved via the world wide web at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. -%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings -%% AB. All Rights Reserved.'' -%% -%% $Id$ -%% --module(gen_server2). - -%%% --------------------------------------------------- -%%% -%%% The idea behind THIS server is that the user module -%%% provides (different) functions to handle different -%%% kind of inputs. -%%% If the Parent process terminates the Module:terminate/2 -%%% function is called. -%%% -%%% The user module should export: -%%% -%%% init(Args) -%%% ==> {ok, State} -%%% {ok, State, Timeout} -%%% ignore -%%% {stop, Reason} -%%% -%%% handle_call(Msg, {From, Tag}, State) -%%% -%%% ==> {reply, Reply, State} -%%% {reply, Reply, State, Timeout} -%%% {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_cast(Msg, State) -%%% -%%% ==> {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term terminate(State) is called -%%% -%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... -%%% -%%% ==> {noreply, State} -%%% {noreply, State, Timeout} -%%% {stop, Reason, State} -%%% Reason = normal | shutdown | Term, terminate(State) is called -%%% -%%% terminate(Reason, State) Let the user module clean up -%%% always called when server terminates -%%% -%%% ==> ok -%%% -%%% -%%% The work flow (of the server) can be described as follows: -%%% -%%% User module Generic -%%% ----------- ------- -%%% start -----> start -%%% init <----- . -%%% -%%% loop -%%% handle_call <----- . -%%% -----> reply -%%% -%%% handle_cast <----- . -%%% -%%% handle_info <----- . -%%% -%%% terminate <----- . -%%% -%%% -----> reply -%%% -%%% -%%% --------------------------------------------------- - -%% API --export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, - cast/2, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). - --export([behaviour_info/1]). - -%% System exports --export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). - -%% Internal exports --export([init_it/6, print_event/3]). - --import(error_logger, [format/2]). - -%%%========================================================================= -%%% API -%%%========================================================================= - -behaviour_info(callbacks) -> - [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, - {terminate,2},{code_change,3}]; -behaviour_info(_Other) -> - undefined. - -%%% ----------------------------------------------------------------- -%%% Starts a generic server. -%%% start(Mod, Args, Options) -%%% start(Name, Mod, Args, Options) -%%% start_link(Mod, Args, Options) -%%% start_link(Name, Mod, Args, Options) where: -%%% Name ::= {local, atom()} | {global, atom()} -%%% Mod ::= atom(), callback module implementing the 'real' server -%%% Args ::= term(), init arguments (to Mod:init/1) -%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] -%%% Flag ::= trace | log | {logfile, File} | statistics | debug -%%% (debug == log && statistics) -%%% Returns: {ok, Pid} | -%%% {error, {already_started, Pid}} | -%%% {error, Reason} -%%% ----------------------------------------------------------------- -start(Mod, Args, Options) -> - gen:start(?MODULE, nolink, Mod, Args, Options). - -start(Name, Mod, Args, Options) -> - gen:start(?MODULE, nolink, Name, Mod, Args, Options). - -start_link(Mod, Args, Options) -> - gen:start(?MODULE, link, Mod, Args, Options). - -start_link(Name, Mod, Args, Options) -> - gen:start(?MODULE, link, Name, Mod, Args, Options). - - -%% ----------------------------------------------------------------- -%% Make a call to a generic server. -%% If the server is located at another node, that node will -%% be monitored. -%% If the client is trapping exits and is linked server termination -%% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- -call(Name, Request) -> - case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) - end. - -call(Name, Request, Timeout) -> - case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -%% ----------------------------------------------------------------- -%% Make a cast to a generic server. -%% ----------------------------------------------------------------- -cast({global,Name}, Request) -> - catch global:send(Name, cast_msg(Request)), - ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Request); -cast(Dest, Request) when is_atom(Dest) -> - do_cast(Dest, Request); -cast(Dest, Request) when is_pid(Dest) -> - do_cast(Dest, Request). - -do_cast(Dest, Request) -> - do_send(Dest, cast_msg(Request)), - ok. - -cast_msg(Request) -> {'$gen_cast',Request}. - -%% ----------------------------------------------------------------- -%% Send a reply to the client. -%% ----------------------------------------------------------------- -reply({To, Tag}, Reply) -> - catch To ! {Tag, Reply}. - -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n prey -%%----------------------------------------------------------------- -abcast(Name, Request) when is_atom(Name) -> - do_abcast([node() | nodes()], Name, cast_msg(Request)). - -abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_abcast(Nodes, Name, cast_msg(Request)). - -do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> - do_send({Name,Node},Msg), - do_abcast(Nodes, Name, Msg); -do_abcast([], _,_) -> abcast. - -%%% ----------------------------------------------------------------- -%%% Make a call to servers at several nodes. -%%% Returns: {[Replies],[BadNodes]} -%%% A Timeout can be given -%%% -%%% A middleman process is used in case late answers arrives after -%%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will -%%% now arrive to the terminated middleman and so be discarded. -%%% ----------------------------------------------------------------- -multi_call(Name, Req) - when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Req, infinity). - -multi_call(Nodes, Name, Req) - when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Req, infinity). - -multi_call(Nodes, Name, Req, infinity) -> - do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) - when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Req, Timeout). - - -%%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, , ) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive -%% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the -%% process, including registering a name for it. -%%----------------------------------------------------------------- -enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). - -enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); - -enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). - -enter_loop(Mod, Options, State, ServerName, Timeout) -> - Name = get_proc_name(ServerName), - Parent = get_parent(), - Debug = debug_options(Name, Options), - Queue = queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). - -%%%======================================================================== -%%% Gen-callback functions -%%%======================================================================== - -%%% --------------------------------------------------- -%%% Initiate the new process. -%%% Register the name using the Rfunc function -%%% Calls the Mod:init/Args function. -%%% Finally an acknowledge is sent to Parent and the main -%%% loop is entered. -%%% --------------------------------------------------- -init_it(Starter, self, Name, Mod, Args, Options) -> - init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> - Debug = debug_options(Name, Options), - Queue = queue:new(), - case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); - {stop, Reason} -> - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) - end. - -%%%======================================================================== -%%% Internal functions -%%%======================================================================== -%%% --------------------------------------------------- -%%% The MAIN loop. -%%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> - receive - Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue), Debug) - after 0 -> - case queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) - end - end - end. - -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> - case Msg of - {system, From, Req} -> - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); - {'EXIT', Parent, Reason} -> - terminate(Reason, Name, Msg, Mod, State, Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); - _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, - Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) - end. - -%%% --------------------------------------------------- -%%% Send/recive functions -%%% --------------------------------------------------- -do_send(Dest, Msg) -> - catch erlang:send(Dest, Msg). - -do_multi_call(Nodes, Name, Req, infinity) -> - Tag = make_ref(), - Monitors = send_nodes(Nodes, Name, Tag, Req), - rec_nodes(Tag, Monitors, Name, undefined); -do_multi_call(Nodes, Name, Req, Timeout) -> - Tag = make_ref(), - Caller = self(), - Receiver = - spawn( - fun() -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), - Mref = erlang:monitor(process, Receiver), - Receiver ! {self(),Tag}, - receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) - end. - -send_nodes(Nodes, Name, Tag, Req) -> - send_nodes(Nodes, Name, Tag, Req, []). - -send_nodes([Node|Tail], Name, Tag, Req, Monitors) - when is_atom(Node) -> - Monitor = start_monitor(Node, Name), - %% Handle non-existing names in rec_nodes. - catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, - send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); -send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> - %% Skip non-atom Node - send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> - Monitors. - -%% Against old nodes: -%% If no reply has been delivered within 2 secs. (per node) check that -%% the server really exists and wait for ever for the answer. -%% -%% Against contemporary nodes: -%% Wait for reply, server 'DOWN', or timeout from TimerId. - -rec_nodes(Tag, Nodes, Name, TimerId) -> - rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). - -rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) - after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end - end; -rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> - case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok - end, - {Replies, Badnodes}. - -%% Collect all replies that already have arrived -rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> - receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) - after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> - %% R6 node - receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) - after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) - end; -rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> - {Replies, Badnodes}. - - -%%% --------------------------------------------------- -%%% Monitor functions -%%% --------------------------------------------------- - -start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> - if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; - true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end - end. - -%% Cancels a monitor started with Ref=erlang:monitor(_, _). -unmonitor(Ref) when is_reference(Ref) -> - erlang:demonitor(Ref), - receive - {'DOWN', Ref, _, _, _} -> - true - after 0 -> - true - end. - -%%% --------------------------------------------------- -%%% Message handling functions -%%% --------------------------------------------------- - -dispatch({'$gen_cast', Msg}, Mod, State) -> - Mod:handle_cast(Msg, State); -dispatch(Info, Mod, State) -> - Mod:handle_info(Info, State). - -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); - {reply, Reply, NState, Time1} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, [])), - reply(From, Reply), - exit(R); - Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) - end; -handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). - -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); - {reply, Reply, NState, Time1} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), - reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) - end; -handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). - -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> - case Reply of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) - end. - -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> - case Reply of - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, Debug); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, Debug); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) - end. - -reply(Name, {To, Tag}, Reply, State, Debug) -> - reply({To, Tag}, Reply), - sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {out, Reply, To, State} ). - - -%%----------------------------------------------------------------- -%% Callback functions for system messages handling. -%%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). - -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> - terminate(Reason, Name, [], Mod, State, Debug). - -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> - case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; - Else -> Else - end. - -%%----------------------------------------------------------------- -%% Format debug messages. Print them as the call-back module sees -%% them, not as the real erlang messages. Use trace for that. -%%----------------------------------------------------------------- -print_event(Dev, {in, Msg}, Name) -> - case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) - end; -print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); -print_event(Dev, {noreply, State}, Name) -> - io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); -print_event(Dev, Event, Name) -> - io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). - - -%%% --------------------------------------------------- -%%% Terminate the server. -%%% --------------------------------------------------- - -terminate(Reason, Name, Msg, Mod, State, Debug) -> - case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - _ -> - error_info(Reason, Name, Msg, State, Debug), - exit(Reason) - end - end. - -error_info(_Reason, application_controller, _Msg, _State, _Debug) -> - %% OTP-5811 Don't send an error report if it's the system process - %% application_controller which is terminating - let init take care - %% of it instead - ok; -error_info(Reason, Name, Msg, State, Debug) -> - Reason1 = - case Reason of - {undef,[{M,F,A}|MFAs]} -> - case code:is_loaded(M) of - false -> - {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> - case erlang:function_exported(M, F, length(A)) of - true -> - Reason; - false -> - {'function not exported',[{M,F,A}|MFAs]} - end - end; - _ -> - Reason - end, - format("** Generic server ~p terminating \n" - "** Last message in was ~p~n" - "** When Server state == ~p~n" - "** Reason for termination == ~n** ~p~n", - [Name, Msg, State, Reason1]), - sys:print_log(Debug), - ok. - -%%% --------------------------------------------------- -%%% Misc. functions. -%%% --------------------------------------------------- - -opt(Op, [{Op, Value}|_]) -> - {ok, Value}; -opt(Op, [_|Options]) -> - opt(Op, Options); -opt(_, []) -> - false. - -debug_options(Name, Opts) -> - case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) - end. - -dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, - dbg_opts(Name, Opts); -dbg_options(Name, Opts) -> - dbg_opts(Name, Opts). - -dbg_opts(Name, Opts) -> - case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg - end. - -get_proc_name(Pid) when is_pid(Pid) -> - Pid; -get_proc_name({local, Name}) -> - case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; -get_proc_name({global, Name}) -> - case global:safe_whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) - end. - -get_parent() -> - case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> - Parent; - [Parent | _] when is_atom(Parent)-> - name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) - end. - -name_to_pid(Name) -> - case whereis(Name) of - undefined -> - case global:safe_whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid - end. - -%%----------------------------------------------------------------- -%% Status information -%%----------------------------------------------------------------- -format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = - StatusData, - NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, - Header = lists:concat(["Status for generic server ", NameTag]), - Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> - case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> - [{data, [{"State", State}]}] - end, - [{header, Header}, - {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, - {"Queued messages", queue:to_list(Queue)}]} | - Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index d9a82f0e..41064c77 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -75,20 +75,19 @@ start() -> try ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = rabbit_misc:stop_applications(?APPS). + ok = stop_applications(?APPS). stop_and_halt() -> spawn(fun () -> SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; " - "halting in ~p milliseconds~n", + rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n", [SleepTime]), timer:sleep(SleepTime), init:stop() @@ -110,6 +109,34 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), @@ -265,14 +292,9 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), - {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = - application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), - ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), + ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index da0ab9cf..b73090fc 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -34,12 +34,11 @@ -include("rabbit.hrl"). -export([check_login/2, user_pass_login/2, - check_vhost_access/2, check_resource_access/3]). + check_vhost_access/2]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0]). --export([set_permissions/5, clear_permissions/2, - list_vhost_permissions/1, list_user_permissions/1]). +-export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). +-export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). %%---------------------------------------------------------------------------- @@ -48,8 +47,6 @@ -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). --spec(check_resource_access/3 :: - (username(), r(atom()), non_neg_integer()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -58,13 +55,10 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(set_permissions/5 :: - (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). --spec(list_vhost_permissions/1 :: - (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). --spec(list_user_permissions/1 :: - (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). +-spec(list_vhost_users/1 :: (vhost()) -> [username()]). +-spec(list_user_vhosts/1 :: (username()) -> [vhost()]). +-spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). -endif. @@ -118,9 +112,9 @@ internal_lookup_vhost_access(Username, VHostPath) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of + case mnesia:match_object( + #user_vhost{username = Username, + virtual_host = VHostPath}) of [] -> not_found; [R] -> {ok, R} end @@ -137,47 +131,13 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -check_resource_access(Username, - R = #resource{kind = exchange, name = <<"">>}, - Permission) -> - check_resource_access(Username, - R#resource{name = <<"amq.default">>}, - Permission); -check_resource_access(_Username, - #resource{name = <<"amq.gen",_/binary>>}, - _Permission) -> - ok; -check_resource_access(Username, - R = #resource{virtual_host = VHostPath, name = Name}, - Permission) -> - Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of - [] -> - false; - [#user_permission{permission = P}] -> - case regexp:match( - binary_to_list(Name), - binary_to_list(element(Permission, P))) of - {match, _, _} -> true; - nomatch -> false - end - end, - if Res -> ok; - true -> rabbit_misc:protocol_error( - access_refused, "access to ~s refused for user '~s'", - [rabbit_misc:rs(R), Username]) - end. - add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_user, Username}) of + case mnesia:read({user, Username}) of [] -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password = Password}, - write); + ok = mnesia:write(#user{username = Username, + password = Password}); _ -> mnesia:abort({user_already_exists, Username}) end @@ -190,17 +150,8 @@ delete_user(Username) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permissions, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok + ok = mnesia:delete({user, Username}), + ok = mnesia:delete({user_vhost, Username}) end)), rabbit_log:info("Deleted user ~p~n", [Username]), R. @@ -210,28 +161,24 @@ change_password(Username, Password) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password = Password}, - write) + ok = mnesia:write(#user{username = Username, + password = Password}) end)), rabbit_log:info("Changed password for user ~p~n", [Username]), R. list_users() -> - mnesia:dirty_all_keys(rabbit_user). + mnesia:dirty_all_keys(user). lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). + rabbit_misc:dirty_read({user, Username}). add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_vhost, VHostPath}) of + case mnesia:read({vhost, VHostPath}) of [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), + ok = mnesia:write(#vhost{virtual_host = VHostPath}), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -239,8 +186,6 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> @@ -273,79 +218,53 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _}) -> - ok = clear_permissions(Username, VHostPath) + lists:foreach(fun (Username) -> + ok = unmap_user_vhost(Username, VHostPath) end, - list_vhost_permissions(VHostPath)), - ok = mnesia:delete({rabbit_vhost, VHostPath}), + list_vhost_users(VHostPath)), + ok = mnesia:delete({vhost, VHostPath}), ok. list_vhosts() -> - mnesia:dirty_all_keys(rabbit_vhost). + mnesia:dirty_all_keys(vhost). -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case regexp:parse(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) - end. +list_vhost_users(VHostPath) -> + [Username || + #user_vhost{username = Username} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + fun () -> mnesia:index_read(user_vhost, VHostPath, + #user_vhost.virtual_host) + end))]. + +list_user_vhosts(Username) -> + [VHostPath || + #user_vhost{virtual_host = VHostPath} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> mnesia:read({user_vhost, Username}) end))]. -set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), +map_user_vhost(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, - fun () -> ok = mnesia:write( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = #permission{ - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, - write) + fun () -> + ok = mnesia:write( + #user_vhost{username = Username, + virtual_host = VHostPath}) end)). -clear_permissions(Username, VHostPath) -> +unmap_user_vhost(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) + ok = mnesia:delete_object( + #user_vhost{username = Username, + virtual_host = VHostPath}) end)). -list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_vhost( - VHostPath, match_user_vhost('_', VHostPath)))]. - -list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_user( - Username, match_user_vhost(Username, '_')))]. - -list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - #user_permission{user_vhost = #user_vhost{username = Username, - virtual_host = VHostPath}, - permission = #permission{ - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction(QueryThunk)]. - -match_user_vhost(Username, VHostPath) -> - fun () -> mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = '_'}, - read) - end. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 875624ba..dee71d23 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -53,7 +53,7 @@ -spec(start/1 :: (bool() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). - + -endif. %%---------------------------------------------------------------------------- @@ -101,7 +101,7 @@ handle_call({register, Pid, HighMemMFA}, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; - + handle_call(_Request, State) -> {ok, not_understood, State}. @@ -135,7 +135,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of + Mod = case os:type() of %% memsup doesn't take account of buffers or cache when %% considering "free" memory - therefore on Linux we can %% get memory alarms very easily without any pressure @@ -143,7 +143,7 @@ start_memsup() -> %% our own simple memory monitor. %% {unix, linux} -> rabbit_memsup_linux; - + %% Start memsup programmatically rather than via the %% rabbitmq-server script. This is not quite the right %% thing to do as os_mon checks to see if memsup is diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3018582f..2b9abb29 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,13 +37,13 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). --import(gen_server2). +-import(gen_server). -import(lists). -import(queue). @@ -91,17 +91,15 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/7 :: + (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -128,7 +126,7 @@ recover_durable_queues() -> R = rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), + <- mnesia:table(durable_queues), node(Pid) == Node])) end), Queues = lists:map(fun start_queue_process/1, R), @@ -146,7 +144,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of + case mnesia:wread({amqqueue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -159,11 +157,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(rabbit_durable_queue, Q, write), - ok = mnesia:write(rabbit_queue, Q, write), + ok = mnesia:write(durable_queues, Q, write), + ok = mnesia:write(Q), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(rabbit_queue, Q, write), + ok = mnesia:write(Q), ok. start_queue_process(Q) -> @@ -177,7 +175,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({rabbit_queue, Name}). + rabbit_misc:dirty_read({amqqueue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -194,16 +192,15 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( - rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, info). + gen_server:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:call(QPid, {info, Items}) of + case gen_server:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -212,45 +209,45 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}); + gen_server:call(QPid, {deliver_immediately, Txn, Message}); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}), + gen_server:call(QPid, {deliver, Txn, Message}), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server2:cast(QPid, {deliver, Txn, Message}), + gen_server:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). + gen_server:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -259,50 +256,41 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). -limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). - claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}). + gen_server:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}). + gen_server:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). - -unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server:cast(QPid, {notify_sent, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of + case mnesia:wread({amqqueue, QueueName}) of [] -> {error, not_found}; [Q] -> ok = delete_queue(Q), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), + ok = mnesia:delete({durable_queues, QueueName}), ok end end). delete_queue(#amqqueue{name = QueueName}) -> ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({amqqueue, QueueName}), ok. on_node_down(Node) -> @@ -312,7 +300,7 @@ on_node_down(Node) -> fun (Q, Acc) -> ok = delete_queue(Q), Acc end, ok, qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_queue), + <- mnesia:table(amqqueue), node(Pid) == Node])) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7..6282a8fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server2). +-behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -62,10 +62,9 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, - limiter_pid, monitor_ref, unacked_messages, - is_limit_active, + is_overload_protection_active, unsent_message_count}). -define(INFO_KEYS, @@ -86,7 +85,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). + gen_server:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -132,7 +131,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_limit_active = false, + is_overload_protection_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -145,16 +144,20 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. - -ch_record_state_transition(OldCR, NewCR) -> - BlockedOld = is_ch_blocked(OldCR), - BlockedNew = is_ch_blocked(NewCR), - if BlockedOld andalso not(BlockedNew) -> unblock; - BlockedNew andalso not(BlockedOld) -> block; - true -> ok - end. +update_store_and_maybe_block_ch( + C = #cr{is_overload_protection_active = Active, + unsent_message_count = Count}) -> + {Result, NewActive} = + if + not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> + {block_ch, true}; + Active and (Count == 0) -> + {unblock_ch, false}; + true -> + {ok, Active} + end, + store_ch_record(C#cr{is_overload_protection_active = NewActive}), + Result. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -165,37 +168,26 @@ deliver_immediately(Message, Delivered, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, RoundRobinTail} -> - C = #cr{limiter_pid = LimiterPid, - unsent_message_count = Count, + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of - true -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, - store_ch_record(NewC), - NewConsumers = - case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId + 1}}; - false -> - store_ch_record(C#cr{is_limit_active = true}), - NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_immediately(Message, Delivered, - State#q{round_robin = NewConsumers}) - end; + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewConsumers = + case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}; {empty, _} -> - {not_offered, State} + not_offered end. attempt_delivery(none, Message, State) -> @@ -206,8 +198,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - {not_offered, State1} -> - {false, State1} + not_offered -> + {false, State} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -245,22 +237,16 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(State, ChPid, Update) -> - case lookup_ch(ChPid) of - not_found -> +possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, + State = #q{round_robin = RoundRobin}) -> + case update_store_and_maybe_block_ch(C) of + ok -> State; - C -> - NewC = Update(C), - store_ch_record(NewC), - case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> NewRR = unblock_consumers(ChPid, - NewC#cr.consumers, - State#q.round_robin), - run_poke_burst(State#q{round_robin = NewRR}) - end + unblock_ch -> + run_poke_burst(State#q{round_robin = + unblock_consumers(ChPid, Consumers, RoundRobin)}) end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -315,7 +301,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -348,8 +334,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} + not_offered -> + State#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -514,8 +500,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -566,14 +552,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), + gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), + gen_server:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -600,8 +586,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, - ConsumerTag, ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, + ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -615,13 +601,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - store_ch_record(C#cr{consumers = [Consumer | Consumers], - limiter_pid = LimiterPid}), - if Consumers == [] -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok - end, + C1 = C#cr{consumers = [Consumer | Consumers]}, + store_ch_record(C1), State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -641,16 +622,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> + C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - store_ch_record(C#cr{consumers = NewConsumers}), - if NewConsumers == [] -> - ok = rabbit_limiter:unregister(LimiterPid, self()); - true -> - ok - end, + C1 = C#cr{consumers = NewConsumers}, + store_ch_record(C1), ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, @@ -753,33 +730,14 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; -handle_cast({unblock, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); - handle_cast({notify_sent, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} - end)); - -handle_cast({limit, ChPid, LimiterPid}, State) -> - noreply( - possibly_unblock( - State, ChPid, - fun (C = #cr{consumers = Consumers, - limiter_pid = OldLimiterPid, - is_limit_active = Limited}) -> - if Consumers =/= [] andalso OldLimiterPid == undefined -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok - end, - NewLimited = Limited andalso LimiterPid =/= undefined, - C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + case lookup_ch(ChPid) of + not_found -> noreply(State); + T = #cr{unsent_message_count =Count} -> + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) + end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -800,7 +758,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 192ebacd..ca2782c7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,29 +33,23 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --behaviour(gen_server2). - --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +%% callbacks +-export([init/2, handle_message/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). --define(HIBERNATE_AFTER, 1000). - --define(MAX_PERMISSION_CACHE_SIZE, 12). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/5 :: - (channel_number(), pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -67,126 +61,112 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> - {ok, Pid} = gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost], []), - Pid. +start_link(ReaderPid, WriterPid, Username, VHost) -> + buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, + Username, VHost]). do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + Pid ! {method, Method, Content}, + ok. shutdown(Pid) -> - gen_server2:cast(Pid, terminate). + Pid ! terminate, + ok. send_command(Pid, Msg) -> - gen_server2:cast(Pid, {command, Msg}). + Pid ! {command, Msg}, + ok. deliver(Pid, ConsumerTag, AckRequired, Msg) -> - gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). + Pid ! {deliver, ConsumerTag, AckRequired, Msg}, + ok. conserve_memory(Pid, Conserve) -> - gen_server2:cast(Pid, {conserve_memory, Conserve}). + Pid ! {conserve_memory, Conserve}, + ok. %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost]) -> +init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {ok, #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}}. - -handle_call(_Request, _From, State) -> - noreply(State). - -handle_cast({method, Method, Content}, State) -> + #ch{state = starting, + proxy_pid = ProxyPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}. + +handle_message({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - noreply(NewState); + NewState; {noreply, NewState} -> - noreply(NewState); + NewState; stop -> - {stop, normal, State#ch{state = terminating}} + exit(normal) catch exit:{amqp, Error, Explanation, none} -> - ok = notify_queues(internal_rollback(State)), - Reason = {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State#ch{state = terminating}}; + terminate({amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State); exit:normal -> - {stop, normal, State}; + terminate(normal, State); _:Reason -> - {stop, {Reason, erlang:get_stacktrace()}, State} + terminate({Reason, erlang:get_stacktrace()}, State) end; -handle_cast(terminate, State) -> - {stop, normal, State}; +handle_message(terminate, State) -> + terminate(normal, State); -handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(State); + State; -handle_cast({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_message({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{proxy_pid = ProxyPid, + writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - noreply(State1#ch{next_tag = DeliveryTag + 1}); + ok = internal_deliver(WriterPid, ProxyPid, + true, ConsumerTag, DeliveryTag, Msg), + State1#ch{next_tag = DeliveryTag + 1}; -handle_cast({conserve_memory, Conserve}, State) -> - ok = clear_permission_cache(), +handle_message({conserve_memory, Conserve}, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). + State; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; +handle_message({'EXIT', _Pid, Reason}, State) -> + terminate(Reason, State); -handle_info(timeout, State) -> - ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). +handle_message(Other, State) -> + terminate({unexpected_channel_message, Other}, State). -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +%%--------------------------------------------------------------------------- -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%--------------------------------------------------------------------------- - -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + exit(Reason). return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -210,35 +190,6 @@ return_queue_declare_ok(State, NoWait, Q) -> {reply, Reply, NewState} end. -check_resource_access(Username, Resource, Perm) -> - V = {Resource, Perm}, - Cache = case get(permission_cache) of - undefined -> []; - Other -> Other - end, - CacheTail = - case lists:member(V, Cache) of - true -> lists:delete(V, Cache); - false -> ok = rabbit_access_control:check_resource_access( - Username, Resource, Perm), - lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) - end, - put(permission_cache, [V | CacheTail]), - ok. - -clear_permission_cache() -> - erase(permission_cache), - ok. - -check_configure_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.configure). - -check_write_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.write). - -check_read_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.read). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -297,6 +248,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -308,7 +260,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -322,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; + rabbit_exchange:route(Exchange, RoutingKey), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -335,10 +286,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(TxnKey, Acked), + Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> ok = notify_limiter(State#ch.limiter_pid, Acked), - State#ch{unacked_message_q = Remaining}; + none -> State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -349,13 +299,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ writer_pid = WriterPid, + _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -381,13 +330,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid, - limiter_pid = LimiterPid, + _, State = #ch{ proxy_pid = ProxyPid, + reader_pid = ReaderPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_misc:binstring_guid("amq.ctag"); @@ -401,7 +349,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, self(), LimiterPid, + Q, NoAck, ReaderPid, ProxyPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -432,7 +380,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping }) -> + _, State = #ch{ proxy_pid = ProxyPid, + consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -453,7 +402,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, + Q, ProxyPid, ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -465,34 +414,13 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{global = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, "global=true", []); - -handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> - rabbit_misc:protocol_error(not_implemented, - "prefetch_size!=0 (~w)", [Size]); - -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self()), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; +handle_method(#'basic.qos'{}, _, State) -> + %% FIXME: Need to implement QOS + {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, + proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -501,13 +429,14 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), self()) + QPid, lists:reverse(MsgIds), ProxyPid) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, + proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -525,7 +454,8 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, false, ConsumerTag, DeliveryTag, + WriterPid, ProxyPid, + false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -546,7 +476,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -566,7 +495,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -576,7 +504,6 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -627,12 +554,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), - Other + Other -> Other end, return_queue_declare_ok(State, NoWait, Q); @@ -641,7 +565,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), - check_configure_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -652,7 +575,6 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -689,7 +611,6 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -739,11 +660,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( @@ -825,10 +744,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(TxnKey, UAQ) -> +ack(ProxyPid, TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), [QPid | L] end, [], UAQ). @@ -843,9 +762,7 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> ok = notify_limiter(State#ch.limiter_pid, - State#ch.uncommitted_ack_q), - new_tx(State); + ok -> new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -882,37 +799,19 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). - -limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). - -consumer_queues(Consumers) -> - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end]. - -%% tell the limiter about the number of acks that have been received -%% for messages delivered to subscribed consumers, but not acks for -%% messages sent in a response to a basic.get (identified by their -%% 'none' consumer tag) -notify_limiter(undefined, _Acked) -> - ok; -notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) - end. +notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -920,8 +819,7 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", + Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n", [Other]), false end. @@ -931,7 +829,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -943,6 +841,6 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, self(), M, Content); + WriterPid, QPid, ChPid, M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e6717d68..cbc11b40 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -114,10 +114,10 @@ Available commands: delete_vhost list_vhosts - set_permissions [-p ] - clear_permissions [-p ] - list_permissions [-p ] - list_user_permissions + map_user_vhost + unmap_user_vhost + list_user_vhosts + list_vhost_users list_queues [-p ] [ ...] list_exchanges [-p ] [ ...] @@ -223,14 +223,25 @@ action(list_vhosts, Node, [], Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(list_user_permissions, Node, Args = [_Username], Inform) -> - Inform("Listing permissions for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_permissions, - Args})); +action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> + Inform("Mapping user ~p to vhost ~p", Args), + call(Node, {rabbit_access_control, map_user_vhost, Args}); + +action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> + Inform("Unmapping user ~p from vhost ~p", Args), + call(Node, {rabbit_access_control, unmap_user_vhost, Args}); + +action(list_user_vhosts, Node, Args = [_Username], Inform) -> + Inform("Listing vhosts for user ~p", Args), + display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args})); + +action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> + Inform("Listing users for vhosts ~p", Args), + display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag(Args), ArgAtoms = list_replace(node, pid, default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, @@ -239,7 +250,7 @@ action(list_queues, Node, Args, Inform) -> action(list_exchanges, Node, Args, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag(Args), ArgAtoms = default_if_empty(RemainingArgs, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), @@ -247,7 +258,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + {VHostArg, _} = parse_vhost_flag(Args), InfoKeys = [exchange_name, routing_key, queue_name, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || @@ -261,37 +272,15 @@ action(list_connections, Node, Args, Inform) -> default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), - ArgAtoms); - -action(Command, Node, Args, Inform) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - action(Command, Node, VHost, RemainingArgs, Inform). - -action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> - Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); - -action(clear_permissions, Node, VHost, [Username], Inform) -> - Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); - -action(list_permissions, Node, VHost, [], Inform) -> - Inform("Listing permissions in vhost ~p", [VHost]), - display_list(call(Node, {rabbit_access_control, list_vhost_permissions, - [VHost]})). + ArgAtoms). parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; - RemainingArgs -> - {"/", RemainingArgs} - end. - -parse_vhost_flag_bin(Args) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - {list_to_binary(VHost), RemainingArgs}. + case Args of + ["-p", VHost | RemainingArgs] -> + {list_to_binary(VHost), RemainingArgs}; + RemainingArgs -> + {<<"/">>, RemainingArgs} + end. default_if_empty(List, Default) when is_list(List) -> if List == [] -> @@ -301,17 +290,21 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> + io:fwrite( + lists:flatten( + rabbit_misc:intersperse( + "\t", + [format_info_item(Result, X) || X <- InfoItemKeys]))), + io:nl() + end, + Results), ok; + display_info_list(Other, _) -> Other. -display_row(Row) -> - io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), - io:nl(). - format_info_item(Items, Key) -> {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), case Info of @@ -328,10 +321,8 @@ format_info_item(Items, Key) -> end. display_list(L) when is_list(L) -> - lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); - (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + lists:foreach(fun (I) -> + io:format("~s~n", [binary_to_list(I)]) end, lists:sort(L)), ok; diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 183b6984..9a9220b5 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -46,7 +46,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " + rabbit_log:error("Failed to append contents of " ++ "log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 19efd9fc..925c335c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/3]). + route/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). +-export([check_type/1, assert_type/2, topic_matches/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). +-spec(route/2 :: (exchange(), routing_key()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,7 +88,6 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -107,18 +106,16 @@ recover() -> fun () -> mnesia:foldl( fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(Exchange), Acc - end, ok, rabbit_durable_exchange), + end, ok, durable_exchanges), mnesia:foldl( fun (Route, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), Acc - end, ok, rabbit_durable_route), + end, ok, durable_routes), ok end). @@ -130,11 +127,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), + case mnesia:wread({exchange, ExchangeName}) of + [] -> ok = mnesia:write(Exchange), if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); + ok = mnesia:write( + durable_exchanges, Exchange, write); true -> ok end, Exchange; @@ -148,8 +145,6 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; -check_type(<<"headers">>) -> - headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -163,7 +158,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> - rabbit_misc:dirty_read({rabbit_exchange, Name}). + rabbit_misc:dirty_read({exchange, Name}). lookup_or_die(Name) -> case lookup(Name) of @@ -175,7 +170,6 @@ lookup_or_die(Name) -> list(VHostPath) -> mnesia:dirty_match_object( - rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). map(VHostPath, F) -> @@ -217,80 +211,64 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}) -> + routing_key = RoutingKey}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey, Content), + QPids = route(Exchange, RoutingKey), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -route(X = #exchange{type = topic}, RoutingKey, _Content) -> - match_bindings(X, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); - -route(X = #exchange{type = headers}, _RoutingKey, Content) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, - match_bindings(X, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - match_routing_key(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - match_routing_key(X, RoutingKey). - +%% %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(#exchange{name = Name}, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), +route(#exchange{name = Name, type = topic}, RoutingKey) -> + Query = qlc:q([QName || + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Name, + %% TODO: This causes a full scan for each entry + %% with the same exchange (see bug 19336) + topic_matches(BindingKey, RoutingKey)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- + [QName || #route{binding = #binding{queue_name = QName, + key = BindingKey}} <- mnesia:dirty_match_object( - rabbit_route, #route{binding = #binding{exchange_name = Name, _ = '_'}}), - Match(Binding)] - end). + topic_matches(BindingKey, RoutingKey)] + end); + +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); -match_routing_key(#exchange{name = Name}, RoutingKey) -> +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of + case mnesia:dirty_read({amqqueue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end @@ -301,37 +279,33 @@ lookup_qpids(Queues) -> %% to be implemented for 0.91 ? delete_bindings_for_exchange(ExchangeName) -> - [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route) - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - write)], - ok. + indexed_delete( + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + fun delete_forward_routes/1, fun mnesia:delete_object/1). delete_bindings_for_queue(QueueName) -> Exchanges = exchanges_for_queue(QueueName), + indexed_delete( + reverse_route(#route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + fun mnesia:delete_object/1, fun delete_forward_routes/1), [begin - ok = delete_forward_routes(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), + [X] = mnesia:read({exchange, ExchangeName}), ok = maybe_auto_delete(X) end || ExchangeName <- Exchanges], ok. +indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> + [begin + ok = ReverseDeleteFun(reverse_route(Route)), + ok = ForwardsDeleteFun(Route) + end || Route <- mnesia:match_object(Match)], + ok. + delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). + ok = mnesia:delete_object(Route), + ok = mnesia:delete_object(durable_routes, Route, write). exchanges_for_queue(QueueName) -> MatchHead = reverse_route( @@ -340,18 +314,17 @@ exchanges_for_queue(QueueName) -> _ = '_'}}), sets:to_list( sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, try - continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], - 1, read)) + continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(rabbit_route, MatchHead, read) of + case mnesia:match_object(MatchHead) of [] -> false; [_|_] -> true end @@ -363,7 +336,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({rabbit_exchange, Exchange}) of + fun() -> case mnesia:read({exchange, Exchange}) of [] -> {error, exchange_not_found}; [X] -> Fun(X) end @@ -372,7 +345,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( Exchange, - fun(X) -> case mnesia:read({rabbit_queue, Queue}) of + fun(X) -> case mnesia:read({amqqueue, Queue}) of [] -> {error, queue_not_found}; [Q] -> Fun(X, Q) end @@ -404,15 +377,13 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = sort_arguments(Arguments)}, + args = Arguments}, ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); + true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok end, - {Route, ReverseRoute} = route_with_reverse(Binding), - ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write), + [ok, ok] = [Fun(element(1, R), R, write) || + R <- tuple_to_list(route_with_reverse(Binding))], ok. list_bindings(VHostPath) -> @@ -423,7 +394,6 @@ list_bindings(VHostPath) -> queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( - rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, @@ -459,67 +429,6 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and sync_binding/6 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. @@ -566,8 +475,8 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_bindings_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API @@ -583,7 +492,7 @@ list_exchange_bindings(ExchangeName) -> #route{binding = #binding{queue_name = QueueName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + <- mnesia:dirty_match_object(Route)]. % Refactoring is left as an exercise for the reader list_queue_bindings(QueueName) -> @@ -593,4 +502,4 @@ list_queue_bindings(QueueName) -> #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + <- mnesia:dirty_match_object(Route)]. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 5c447792..060bed48 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -95,15 +95,13 @@ collect_content(ChannelPid, MethodName) -> true -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", + "expected content header for class ~w, got one for class ~w instead", [ClassId, HeaderClassId]) end; _ -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, " - "got non content header frame instead", + "expected content header for class ~w, got non content header frame instead", [ClassId]) end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl deleted file mode 100644 index 532be26d..00000000 --- a/src/rabbit_limiter.erl +++ /dev/null @@ -1,195 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_limiter). - --behaviour(gen_server). - --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). --export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(maybe_pid() :: pid() | 'undefined'). - --spec(start_link/1 :: (pid()) -> pid()). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - --record(lim, {prefetch_count = 0, - ch_pid, - queues = dict:new(), % QPid -> {MonitorRef, Notify} - volume = 0}). -%% 'Notify' is a boolean that indicates whether a queue should be -%% notified of a change in the limit or volume that may allow it to -%% deliver more messages via the limiter's channel. - -%%---------------------------------------------------------------------------- -%% API -%%---------------------------------------------------------------------------- - -start_link(ChPid) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), - Pid. - -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - -limit(undefined, 0) -> - ok; -limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). - -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit -can_send(undefined, _QPid) -> - true; -can_send(LimiterPid, QPid) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). - -%% Let the limiter know that the channel has received some acks from a -%% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). - -register(undefined, _QPid) -> ok; -register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). - -unregister(undefined, _QPid) -> ok; -unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). - -%%---------------------------------------------------------------------------- -%% gen_server callbacks -%%---------------------------------------------------------------------------- - -init([ChPid]) -> - {ok, #lim{ch_pid = ChPid} }. - -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of - true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} - end. - -handle_cast(shutdown, State) -> - {stop, normal, State}; - -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - -handle_cast({ack, Count}, State = #lim{volume = Volume}) -> - NewVolume = if Volume == 0 -> 0; - true -> Volume - Count - end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; - -handle_cast({register, QPid}, State) -> - {noreply, remember_queue(QPid, State)}; - -handle_cast({unregister, QPid}, State) -> - {noreply, forget_queue(QPid, State)}. - -handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> - {noreply, forget_queue(QPid, State)}. - -terminate(_, _) -> - ok. - -code_change(_, State, _) -> - State. - -%%---------------------------------------------------------------------------- -%% Internal plumbing -%%---------------------------------------------------------------------------- - -maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> notify_queues(NewState); - false -> NewState - end. - -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> - Limit =/= 0 andalso Volume >= Limit. - -remember_queue(QPid, State = #lim{queues = Queues}) -> - case dict:is_key(QPid, Queues) of - false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; - true -> State - end. - -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> - case dict:find(QPid, Queues) of - {ok, {MRef, _}} -> - true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), - State#lim{queues = dict:erase(QPid, Queues)}; - error -> State - end. - -limit_queue(QPid, State = #lim{queues = Queues}) -> - UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. - -notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> - {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), - case length(QList) of - 0 -> ok; - L -> - %% We randomly vary the position of queues in the list, - %% thus ensuring that each queue has an equal chance of - %% being notified first. - {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], - ok - end, - State#lim{queues = NewQueues}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 214c9528..973e163b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,7 +50,6 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). --export([start_applications/1, stop_applications/1]). -import(mnesia). -import(lists). @@ -109,8 +108,6 @@ -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'true'). --spec(start_applications/1 :: ([atom()]) -> 'ok'). --spec(stop_applications/1 :: ([atom()]) -> 'ok'). -endif. @@ -237,7 +234,7 @@ filter_exit_map(F, L) -> with_user(Username, Thunk) -> fun () -> - case mnesia:read({rabbit_user, Username}) of + case mnesia:read({user, Username}) of [] -> mnesia:abort({no_such_user, Username}); [_U] -> @@ -247,7 +244,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> - case mnesia:read({rabbit_vhost, VHostPath}) of + case mnesia:read({vhost, VHostPath}) of [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> @@ -401,32 +398,3 @@ format_stderr(Fmt, Args) -> Port = open_port({fd, 0, 2}, [out]), port_command(Port, io_lib:format(Fmt, Args)), port_close(Port). - -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> - Iterate(fun (App, Acc) -> - case Do(App) of - ok -> [App | Acc]; - {error, {SkipError, _}} -> Acc; - {error, Reason} -> - lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) - end - end, [], Apps), - ok. - -start_applications(Apps) -> - manage_applications(fun lists:foldl/3, - fun application:start/1, - fun application:stop/1, - already_started, - cannot_start_application, - Apps). - -stop_applications(Apps) -> - manage_applications(fun lists:foldr/3, - fun application:stop/1, - fun application:start/1, - not_started, - cannot_stop_application, - Apps). - diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 15213861..d19c37cb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -100,50 +100,33 @@ force_reset() -> reset(true). %%-------------------------------------------------------------------- table_definitions() -> - [{rabbit_user, - [{record_name, user}, - {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, - {rabbit_user_permission, - [{record_name, user_permission}, - {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, - {rabbit_vhost, - [{record_name, vhost}, - {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, - {rabbit_config, - [{disc_copies, [node()]}]}, - {rabbit_listener, - [{record_name, listener}, - {attributes, record_info(fields, listener)}, - {type, bag}]}, - {rabbit_durable_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, - {rabbit_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {type, ordered_set}]}, - {rabbit_reverse_route, - [{record_name, reverse_route}, - {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, - {rabbit_durable_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, - {rabbit_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, - {rabbit_durable_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, - {rabbit_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + [{user, [{disc_copies, [node()]}, + {attributes, record_info(fields, user)}]}, + {user_vhost, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_vhost)}, + {index, [virtual_host]}]}, + {vhost, [{disc_copies, [node()]}, + {attributes, record_info(fields, vhost)}]}, + {rabbit_config, [{disc_copies, [node()]}]}, + {listener, [{type, bag}, + {attributes, record_info(fields, listener)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, + {durable_exchanges, [{disc_copies, [node()]}, + {record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {exchange, [{attributes, record_info(fields, exchange)}]}, + {durable_queues, [{disc_copies, [node()]}, + {record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}]}, + {amqqueue, [{attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. @@ -260,8 +243,8 @@ init_db(ClusterNodes) -> %% NB: we cannot use rabbit_log here since %% it may not have been started yet error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " + "schema integrity check failed: ~p~n" ++ + "moving database to backup location " ++ "and recreating schema from scratch~n", [Reason]), ok = move_db(), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5a..99ea37d8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -123,7 +123,6 @@ stop_tcp_listener(Host, Port) -> tcp_listener_started(IPAddress, Port) -> ok = mnesia:dirty_write( - rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), @@ -131,20 +130,19 @@ tcp_listener_started(IPAddress, Port) -> tcp_listener_stopped(IPAddress, Port) -> ok = mnesia:dirty_delete_object( - rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), port = Port}). active_listeners() -> - rabbit_misc:dirty_read_all(rabbit_listener). + rabbit_misc:dirty_read_all(listener). node_listeners(Node) -> - mnesia:dirty_read(rabbit_listener, Node). + mnesia:dirty_read(listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(rabbit_listener, Node). + ok = mnesia:dirty_delete(listener, Node). start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 12ee299e..3f8d7cac 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,8 +173,7 @@ setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), + rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), rabbit_misc:set_config(profiling_enabled, false), fprof:trace(start); true -> @@ -284,8 +283,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -353,14 +350,6 @@ terminate_channel(Channel, Ref, State) -> end, State. -handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), - handle_exception(State, Channel, Reason). - handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -415,8 +404,7 @@ wait_for_channel_termination(N, TimerRef) -> normal -> ok; _ -> rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", + "connection ~p, channel ~p - error while terminating:~n~p~n", [self(), Channel, Reason]) end, wait_for_channel_termination(N-1, TimerRef) @@ -721,8 +709,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/4, + [self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ff42ea04..ad653a2f 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -32,7 +32,7 @@ -module(rabbit_router). -include("rabbit.hrl"). --behaviour(gen_server2). +-behaviour(gen_server). -export([start_link/0, deliver/5]). @@ -58,7 +58,7 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -ifdef(BUG19758). @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server2:cast( + gen_server:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call( + try gen_server:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}) catch @@ -143,7 +143,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, spawn( fun () -> R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), - gen_server2:reply(From, R) + gen_server:reply(From, R) end), {noreply, State}. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 2a365ce1..9e4c9c8a 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -47,7 +47,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " + rabbit_log:error("Failed to append contents of " ++ "sasl log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6312e8e3..df2e71d9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -444,16 +444,17 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", ".*", ".*", ".*"]), + control_action(map_user_vhost, ["foo", "/"]), {error, {no_such_user, _}} = - control_action(clear_permissions, ["foo"]), + control_action(unmap_user_vhost, ["foo", "/"]), {error, {no_such_user, _}} = - control_action(list_user_permissions, ["foo"]), + control_action(list_user_vhosts, ["foo"]), {error, {no_such_vhost, _}} = - control_action(list_permissions, ["-p", "/testhost"]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - + control_action(map_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(unmap_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(list_vhost_users, ["/testhost"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = @@ -468,16 +469,13 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(list_permissions, ["-p", "/testhost"]), - ok = control_action(list_user_permissions, ["foo"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(list_user_vhosts, ["foo"]), %% user/vhost unmapping - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -486,8 +484,7 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion -- cgit v1.2.1 From ecba118cdfa383f6b98089f9dcbc0bf9c37bb826 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 16 Feb 2009 11:42:59 +0000 Subject: prevent unwanted path expansion this time committed on a branch in the right place --- scripts/rabbitmq-multi | 5 +++++ scripts/rabbitmq-server | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 84985e90..164c5e18 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -54,6 +54,11 @@ export \ RABBITMQ_SCRIPT_HOME \ RABBITMQ_PIDS_FILE +# we need to turn off path expansion because some of the vars, notably +# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and +# there is no other way of preventing their expansion. +set -f + exec erl \ -pa "`dirname $0`/../ebin" \ -noinput \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 572262c9..9a35c477 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -73,6 +73,11 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +# we need to turn off path expansion because some of the vars, notably +# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and +# there is no other way of preventing their expansion. +set -f + exec erl \ -pa "`dirname $0`/../ebin" \ ${RABBITMQ_START_RABBIT} \ -- cgit v1.2.1 From 3215619460e8a935dae5d25c00a7df31bf5454e8 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 16 Mar 2009 17:54:36 +0000 Subject: Update some 64-bit documentation --- packaging/windows/rabbitmq-service.pod | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod index 7c4d3ef2..92648076 100644 --- a/packaging/windows/rabbitmq-service.pod +++ b/packaging/windows/rabbitmq-service.pod @@ -92,8 +92,10 @@ Defaults to 5672. =head2 ERLANG_SERVICE_MANAGER_PATH -Defaults to F. This is -the installation location of the Erlang service manager. +Defaults to F +(or F for 64-bit +environments). This is the installation location of the Erlang service +manager. =head2 CLUSTER_CONFIG_FILE -- cgit v1.2.1 From 4bffa78fed834258fb1978ef25e209208f56fba2 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 8 May 2009 15:37:29 +0100 Subject: Remove duplicated username in useradd command --- packaging/RPMS/Fedora/rabbitmq-server.spec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 54c7def5..184a9832 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -81,8 +81,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - useradd -r -g rabbitmq -d %{_localstatedir}/lib/rabbitmq rabbitmq \ - -c "RabbitMQ messaging server" rabbitmq + useradd -r -g rabbitmq -d %{_localstatedir}/lib/rabbitmq rabbitmq \ + -c "RabbitMQ messaging server" fi /sbin/chkconfig --add %{name} -- cgit v1.2.1 From 3290b43b18d0ca3790d8d4bb4c099b63468a3932 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 8 May 2009 15:44:58 +0100 Subject: Create user and group in %pre rather than %post They are referred to in the %files section, which leads to warnings if they are not created until %post runs. --- packaging/RPMS/Fedora/rabbitmq-server.spec | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 184a9832..6bf3b841 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -28,13 +28,6 @@ scalable implementation of an AMQP broker. %define _maindir %{buildroot}%{_rabbit_erllibdir} -%pre -if [ $1 -gt 1 ]; then - #Upgrade - stop and remove previous instance of rabbitmq-server init.d script - /sbin/service rabbitmq-server stop - /sbin/chkconfig --del rabbitmq-server -fi - %prep %setup -q @@ -73,7 +66,14 @@ echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) -%post +%pre + +if [ $1 -gt 1 ]; then + #Upgrade - stop and remove previous instance of rabbitmq-server init.d script + /sbin/service rabbitmq-server stop + /sbin/chkconfig --del rabbitmq-server +fi + # create rabbitmq group if ! getent group rabbitmq >/dev/null; then groupadd -r rabbitmq @@ -85,6 +85,7 @@ if ! getent passwd rabbitmq >/dev/null; then -c "RabbitMQ messaging server" fi +%post /sbin/chkconfig --add %{name} %preun -- cgit v1.2.1