diff options
author | Michael Bridgen <mikeb@lshift.net> | 2010-04-26 18:05:20 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2010-04-26 18:05:20 +0100 |
commit | 9df5e95fe9ed434a91eaf48eefacc5d7c2165309 (patch) | |
tree | 53c569cba68aaa20c3eaf2ffb12f56ce6544ae68 | |
parent | b4e95e1afe768aace579a3d2f17e652e2d9655ed (diff) | |
parent | d4098eb629aebb2866f1f2299098d45b82be1e21 (diff) | |
download | rabbitmq-server-9df5e95fe9ed434a91eaf48eefacc5d7c2165309.tar.gz |
Merge default into amqp_0_9_1. There was a conflict in
rabbit_amqqueue.erl, due in the main to recover/0 being removed and
refactoring of durable queue recovery in general.
-rw-r--r-- | Makefile | 14 | ||||
-rw-r--r-- | include/rabbit.hrl | 13 | ||||
-rw-r--r-- | src/rabbit.erl | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 65 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 5 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 29 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 49 | ||||
-rw-r--r-- | src/rabbit_dialyzer.erl | 6 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 5 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 65 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 7 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 28 | ||||
-rw-r--r-- | src/rabbit_router.erl | 10 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 7 | ||||
-rw-r--r-- | src/worker_pool.erl | 28 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 12 |
17 files changed, 250 insertions, 174 deletions
@@ -5,6 +5,7 @@ RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) +RABBITMQ_CONFIG_FILE ?= $(CURDIR)/rabbitmq DEPS_FILE=deps.mk SOURCE_DIR=src @@ -130,6 +131,7 @@ run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ + RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \ ./scripts/rabbitmq-server run-node: all @@ -137,6 +139,7 @@ run-node: all RABBITMQ_NODE_ONLY=true \ RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ + RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \ ./scripts/rabbitmq-server run-tests: all @@ -146,6 +149,7 @@ start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ + RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \ ./scripts/rabbitmq-server ; sleep 1 start-rabbit-on-node: all @@ -211,11 +215,11 @@ distclean: clean # generated but empty if we fail $(SOURCE_DIR)/%_usage.erl: xsltproc --stringparam modulename "`basename $@ .erl`" \ - $(DOCS_DIR)/usage.xsl $< > $@.tmp && \ - sed -e s/\\\"/\\\\\\\"/g -e s/%QUOTE%/\\\"/g $@.tmp > $@.tmp2 && \ - fold -s $@.tmp2 > $@.tmp3 && \ - cp $@.tmp3 $@ && \ - rm $@.tmp $@.tmp2 $@.tmp3 + $(DOCS_DIR)/usage.xsl $< > $@.tmp + sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2 + fold -s $@.tmp2 > $@.tmp3 + mv $@.tmp3 $@ + rm $@.tmp $@.tmp2 # We rename the file before xmlto sees it since xmlto will use the name of # the file to make internal links. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b453bbde..852fdd8b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,8 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -83,6 +84,7 @@ -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). -type(regexp() :: binary()). +-type(file_path() :: string()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -144,7 +146,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), @@ -154,7 +157,7 @@ message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -166,6 +169,7 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). + -endif. %%---------------------------------------------------------------------------- @@ -175,6 +179,9 @@ -define(MAX_WAIT, 16#ffffffff). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/src/rabbit.erl b/src/rabbit.erl index 2adac0d6..b9eb45ce 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -91,12 +91,6 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, - {requires, kernel_ready}, - {enables, core_initialized}]}). - -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, {mfa, {rabbit_sup, start_restartable_child, @@ -109,7 +103,6 @@ {mfa, {rabbit_sup, start_restartable_child, [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -125,14 +118,15 @@ {mfa, {rabbit_exchange, recover, []}}, {requires, empty_db_check}]}). --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}]}). -rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). + [{mfa, {rabbit_sup, start_child, + [rabbit_persister]}}, + {requires, queue_sup_queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 919a04db..da12d515 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/5, delete/3, purge/1]). +-export([start/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -62,7 +62,6 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -91,12 +90,12 @@ -spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> @@ -116,26 +115,34 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. +find_durable_queues() -> + Node = node(), + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). shared_or_live_owner(none) -> true; shared_or_live_owner(Owner) when is_pid(Owner) -> rpc:call(node(Owner), erlang, is_process_alive, [Owner]). -recover_durable_queues() -> - Node = node(), - lists:foreach( - fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }) -> +recover_durable_queues(DurableQueues) -> + lists:foldl( + fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }, + Acc) -> %% We need to catch the case where a client connected to %% another node has deleted the queue (and possibly %% re-created it). @@ -151,29 +158,23 @@ recover_durable_queues() -> end, case shared_or_live_owner(Owner) of true -> - Q = start_queue_process(RecoveredQ), + Q = start_queue_process(RecoveredQ), case DoIfSameQueue(fun () -> store_queue(Q) end) of - {true, ok} -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + {true, ok} -> [Q | Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc end; false -> case DoIfSameQueue( - fun () -> + fun () -> internal_delete2(RecoveredQ#amqqueue.name) end) of {true, Hook} -> Hook(); false -> ok - end + end, + Acc end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -219,7 +220,7 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -305,16 +306,16 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> +commit_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> +rollback_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, QPids). notify_down_all(QPids, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3a9a6881..5e1231b9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -export([start_link/1, info_keys/0]). @@ -58,7 +56,7 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumer_count, @@ -374,7 +372,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -382,29 +380,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; @@ -432,8 +429,7 @@ do_if_persistent(F, Txn, QName) -> lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - is_persistent = false, + undefined -> #tx{is_persistent = false, pending_messages = [], pending_acks = []}; V -> V @@ -462,26 +458,19 @@ is_tx_persistent(Txn) -> record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). + +process_pending(Txn, ChPid, State) -> + #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = + lookup_tx(Txn), + C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), + {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), deliver_or_enqueue_n(lists:reverse(PendingMessages), State). collect_messages(MsgIds, UAM) -> @@ -590,12 +579,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> +handle_call({commit, Txn, ChPid}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), + NewState = process_pending(Txn, ChPid, State), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(NewState); handle_call({notify_down, ChPid}, _From, State) -> @@ -762,9 +752,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> noreply(State) end; -handle_cast({rollback, Txn}, State) -> +handle_cast({rollback, Txn, ChPid}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(State); handle_cast({redeliver, Messages}, State) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0f3a8664..dbd65780 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1]). -export([init/1]). @@ -42,6 +42,9 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Args) -> + supervisor:start_child(?SERVER, Args). + init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72..4ab7a2a0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c1d744f4..a0a357fe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,9 +48,6 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking}). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(INFO_KEYS, @@ -75,7 +72,7 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). @@ -392,14 +389,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -963,7 +958,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -980,7 +975,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey, queue:len(UAQ), queue:len(UAMQ)]), case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> rabbit_misc:protocol_error( @@ -996,14 +991,11 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -1049,16 +1041,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620..f19e8d02 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7d840861..878af029 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -249,10 +249,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> State#lim{queues = NewQueues}. unlink_on_stopped(LimiterPid, stopped) -> - true = unlink(LimiterPid), - ok = receive {'EXIT', LimiterPid, _Reason} -> ok - after 0 -> ok - end, + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), stopped; unlink_on_stopped(_LimiterPid, Result) -> Result. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81cecb38..028b0d73 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -59,6 +59,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -97,8 +98,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -119,20 +120,27 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -601,3 +609,46 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74..55a6761d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 019d2a26..8aa5ad8d 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -70,11 +70,11 @@ -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). @@ -406,7 +406,10 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered}, Acc) -> + rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( rabbit_misc:upmap( @@ -425,13 +428,6 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, %% contains the mutated messages and queues tables Snapshot. -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> @@ -474,12 +470,8 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab..a449e19e 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -76,13 +76,9 @@ deliver(QPids, Delivery) -> %% which then in turn delivers it to its queues. deliver_per_node( dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), + lists:foldl(fun (QPid, D) -> + rabbit_misc:dict_cons(node(QPid), QPid, D) + end, dict:new(), QPids)), Delivery). deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 25715e6e..2c5e5112 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_restartable_child/1, start_restartable_child/2]). -export([init/1]). @@ -49,8 +49,11 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, + {ChildId, {Mod, start_link, Args}, transient, ?MAX_WAIT, worker, [Mod]}), ok. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 1ee958af..97e07545 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -40,12 +40,10 @@ %% %% 1. Allow priorities (basically, change the pending queue to a %% priority_queue). -%% -%% 2. Allow the submission to the pool_worker to be async. -behaviour(gen_server2). --export([start_link/0, submit/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,6 +54,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -80,6 +80,9 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + idle(WId) -> gen_server2:cast(?SERVER, {idle, WId}). @@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail, pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> - {noreply, State #state { pending = queue:in(From, Pending) }, + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, hibernate}; {{value, WId}, Avail1} -> {reply, get_worker_pid(WId), State #state { available = Avail1 }, @@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, From}, Pending1} -> + {{value, {next_free, From}}, Pending1} -> gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 3bfcc2d9..d3a48119 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, run/1]). +-export([start_link/1, submit/2, submit_async/2, run/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,6 +44,8 @@ -spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -60,6 +62,9 @@ start_link(WId) -> submit(Pid, Fun) -> gen_server2:call(Pid, {submit, Fun}, infinity). +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + init([WId]) -> ok = worker_pool:idle(WId), put(worker_pool_worker, true), @@ -74,6 +79,11 @@ handle_call({submit, Fun}, From, WId) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |