diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-08 11:16:02 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-08 11:16:02 +0100 |
commit | 6a7d99ad7a86fbdd80090dc43916586f93d1ec32 (patch) | |
tree | 8c4e12ad24bfa610bbe096d3ae09a838972aee67 | |
parent | dc682aeed5f1c63b583635e34b582b52f95d97e1 (diff) | |
parent | 351e31e7af73877f683ebcf13dc711d29d10254b (diff) | |
download | rabbitmq-server-6a7d99ad7a86fbdd80090dc43916586f93d1ec32.tar.gz |
merging in bug20782
-rw-r--r-- | include/rabbit.hrl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 16 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 10 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 20 | ||||
-rw-r--r-- | src/rabbit_router.erl | 59 |
8 files changed, 103 insertions, 81 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ffda0698..784c21b3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,6 +64,8 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(delivery, {mandatory, immediate, txn, sender, message}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -134,6 +136,12 @@ content :: content(), persistent_key :: maybe(pkey())}). -type(message() :: basic_message()). +-type(delivery() :: + #delivery{mandatory :: bool(), + immediate :: bool(), + txn :: maybe(txn()), + sender :: pid(), + message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). -type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d5f19026..198e2782 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). @@ -85,7 +85,7 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). +-spec(deliver/2 :: (pid(), delivery()) -> bool()). -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). @@ -241,13 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); -deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}, infinity), +deliver(QPid, #delivery{immediate = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, + infinity); +deliver(QPid, #delivery{mandatory = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), true; -deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server2:cast(QPid, {deliver, Txn, Message}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. redeliver(QPid, Messages) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7..6027c9c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,6 +66,7 @@ monitor_ref, unacked_messages, is_limit_active, + txn, unsent_message_count}). -define(INFO_KEYS, @@ -133,6 +134,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, unacked_messages = dict:new(), is_limit_active = false, + txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -156,6 +158,11 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. +record_current_channel_tx(ChPid, Txn) -> + %% as a side effect this also starts monitoring the channel (if + %% that wasn't happening already) + store_ch_record((ch_record(ChPid))#cr{txn = Txn}). + deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, @@ -198,7 +205,7 @@ deliver_immediately(Message, Delivered, {not_offered, State} end. -attempt_delivery(none, Message, State) -> +attempt_delivery(none, _ChPid, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; @@ -209,13 +216,13 @@ attempt_delivery(none, Message, State) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, Message, State) -> +attempt_delivery(Txn, ChPid, Message, State) -> persist_message(Txn, qname(State), Message), - record_pending_message(Txn, Message), + record_pending_message(Txn, ChPid, Message), {true, State}. -deliver_or_enqueue(Txn, Message, State) -> - case attempt_delivery(Txn, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, State) -> + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -295,10 +302,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of not_found -> noreply(State); - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, + unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), + case Txn of + none -> ok; + _ -> ok = rollback_work(Txn, qname(State)), + erase_tx(Txn) + end, case check_auto_delete( deliver_or_enqueue_n( [{Message, true} || @@ -456,13 +469,17 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, Message) -> +record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], + ch_pid = ChPid}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], + ch_pid = ChPid}). process_pending(Txn, State) -> #tx{ch_pid = ChPid, @@ -541,7 +558,7 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call({deliver_immediately, Txn, Message}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -555,12 +572,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({deliver, Txn, Message}, _From, State) -> +handle_call({deliver, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> @@ -711,9 +728,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({deliver, Txn, Message}, State) -> +handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b2e85820..761b3863 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,14 +33,15 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, message/4]). +-export([publish/1, message/4, delivery/4]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) -> +-spec(publish/1 :: (delivery()) -> {ok, routing_result(), [pid()]} | not_found()). +-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> message()). @@ -48,17 +49,20 @@ %%---------------------------------------------------------------------------- -publish(Mandatory, Immediate, Txn, - Message = #basic_message{exchange_name = ExchangeName}) -> +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of {ok, X} -> - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message), + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), {ok, RoutingRes, DeliveredQPids}; Other -> Other end. +delivery(Mandatory, Immediate, Txn, Message) -> + #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, + sender = self(), message = Message}. + message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 84b414fd..3089bb62 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -324,8 +324,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, persistent_key = PersistentKey}, {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, - Message), + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of routed -> ok; diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index d73edb73..76016a8c 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(false, false, none, - rabbit_basic:message( - LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data)))), + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index ca0e337b..7d9948f0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/5]). + publish/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -72,8 +72,7 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Mandatory, Immediate, Txn, Message) -> - publish(X, [], Mandatory, Immediate, Txn, Message). +publish(X, Delivery) -> + publish(X, [], Delivery). -publish(X, Seen, Mandatory, Immediate, Txn, - Message = #basic_message{routing_key = RK, content = C}) -> - case rabbit_router:deliver(route(X, RK, C), - Mandatory, Immediate, Txn, Message) of +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn, false -> case lookup(AName) of {ok, AX} -> - publish(AX, NewSeen, - Mandatory, Immediate, Txn, - Message); + publish(AX, NewSeen, Delivery); {error, not_found} -> rabbit_log:warning( "alternate exchange for ~s " diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 57166428..10f80cc3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). -export([start_link/0, - deliver/5]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,8 +50,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. @@ -62,13 +61,13 @@ start_link() -> -ifdef(BUG19758). -deliver(QPids, Mandatory, Immediate, Txn, Message) -> - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)). +deliver(QPids, Delivery) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)). -else. -deliver(QPids, Mandatory, Immediate, Txn, Message) -> +deliver(QPids, Delivery) -> %% we reduce inter-node traffic by grouping the qpids by node and %% only delivering one copy of the message to each node involved, %% which then in turn delivers it to its queues. @@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) -> [QPid], D) end, dict:new(), QPids)), - Mandatory, Immediate, Txn, Message). + Delivery). -deliver_per_node([{Node, QPids}], Mandatory, Immediate, - Txn, Message) - when Node == node() -> +deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> %% optimisation - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)); -deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, - Txn, Message) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)); +deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver in run_bindings below will deliver the %% message to the queue process asynchronously, and return true, @@ -101,20 +98,16 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, {routed, lists:flatmap( fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), + gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), QPids end, NodeQPids)}; -deliver_per_node(NodeQPids, Mandatory, Immediate, - Txn, Message) -> +deliver_per_node(NodeQPids, Delivery) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}, - infinity) + try gen_server2:call({?SERVER, Node}, + {deliver, QPids, Delivery}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -131,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, end, {false, []}, R), - check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, lists:append(Handled)}). -endif. @@ -140,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, init([]) -> {ok, no_state}. -handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, - From, State) -> +handle_call({deliver, QPids, Delivery}, From, State) -> spawn( fun () -> - R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), + R = run_bindings(QPids, Delivery), gen_server2:reply(From, R) end), {noreply, State}. -handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, - State) -> +handle_cast({deliver, QPids, Delivery}, State) -> %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Mandatory, Immediate, Txn, Message), + run_bindings(QPids, Delivery), {noreply, State}. handle_info(_Info, State) -> @@ -166,11 +158,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> +run_bindings(QPids, Delivery) -> lists:foldl( fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, - Txn, Message, QPid) of + case catch rabbit_amqqueue:deliver(QPid, Delivery) of true -> {true, [QPid | Handled]}; false -> {true, Handled}; {'EXIT', _Reason} -> {Routed, Handled} |