diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 16 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 10 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 20 | ||||
-rw-r--r-- | src/rabbit_router.erl | 59 |
6 files changed, 59 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d5f19026..64f078bd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). @@ -85,7 +85,7 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). +-spec(deliver/2 :: (pid(), delivery()) -> bool()). -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). @@ -241,12 +241,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); -deliver(true, _IsImmediate, Txn, Message, QPid) -> +deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, + infinity); +deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) -> gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; -deliver(false, _IsImmediate, Txn, Message, QPid) -> +deliver(QPid, #delivery{txn = Txn, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message}), true. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b2e85820..ce4f818d 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,14 +33,15 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, message/4]). +-export([publish/1, message/4, delivery/4]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) -> +-spec(publish/1 :: (delivery()) -> {ok, routing_result(), [pid()]} | not_found()). +-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> message()). @@ -48,17 +49,20 @@ %%---------------------------------------------------------------------------- -publish(Mandatory, Immediate, Txn, - Message = #basic_message{exchange_name = ExchangeName}) -> +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of {ok, X} -> - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message), + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), {ok, RoutingRes, DeliveredQPids}; Other -> Other end. +delivery(Mandatory, Immediate, Txn, Message) -> + #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, + message = Message}. + message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 84b414fd..3089bb62 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -324,8 +324,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, persistent_key = PersistentKey}, {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, - Message), + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of routed -> ok; diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index d73edb73..76016a8c 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(false, false, none, - rabbit_basic:message( - LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data)))), + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index ca0e337b..7d9948f0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/5]). + publish/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -72,8 +72,7 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Mandatory, Immediate, Txn, Message) -> - publish(X, [], Mandatory, Immediate, Txn, Message). +publish(X, Delivery) -> + publish(X, [], Delivery). -publish(X, Seen, Mandatory, Immediate, Txn, - Message = #basic_message{routing_key = RK, content = C}) -> - case rabbit_router:deliver(route(X, RK, C), - Mandatory, Immediate, Txn, Message) of +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn, false -> case lookup(AName) of {ok, AX} -> - publish(AX, NewSeen, - Mandatory, Immediate, Txn, - Message); + publish(AX, NewSeen, Delivery); {error, not_found} -> rabbit_log:warning( "alternate exchange for ~s " diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 57166428..10f80cc3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). -export([start_link/0, - deliver/5]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,8 +50,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {routing_result(), [pid()]}). +-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. @@ -62,13 +61,13 @@ start_link() -> -ifdef(BUG19758). -deliver(QPids, Mandatory, Immediate, Txn, Message) -> - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)). +deliver(QPids, Delivery) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)). -else. -deliver(QPids, Mandatory, Immediate, Txn, Message) -> +deliver(QPids, Delivery) -> %% we reduce inter-node traffic by grouping the qpids by node and %% only delivering one copy of the message to each node involved, %% which then in turn delivers it to its queues. @@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) -> [QPid], D) end, dict:new(), QPids)), - Mandatory, Immediate, Txn, Message). + Delivery). -deliver_per_node([{Node, QPids}], Mandatory, Immediate, - Txn, Message) - when Node == node() -> +deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> %% optimisation - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)); -deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, - Txn, Message) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)); +deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver in run_bindings below will deliver the %% message to the queue process asynchronously, and return true, @@ -101,20 +98,16 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, {routed, lists:flatmap( fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), + gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), QPids end, NodeQPids)}; -deliver_per_node(NodeQPids, Mandatory, Immediate, - Txn, Message) -> +deliver_per_node(NodeQPids, Delivery) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}, - infinity) + try gen_server2:call({?SERVER, Node}, + {deliver, QPids, Delivery}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -131,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, end, {false, []}, R), - check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, lists:append(Handled)}). -endif. @@ -140,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, init([]) -> {ok, no_state}. -handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, - From, State) -> +handle_call({deliver, QPids, Delivery}, From, State) -> spawn( fun () -> - R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), + R = run_bindings(QPids, Delivery), gen_server2:reply(From, R) end), {noreply, State}. -handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, - State) -> +handle_cast({deliver, QPids, Delivery}, State) -> %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Mandatory, Immediate, Txn, Message), + run_bindings(QPids, Delivery), {noreply, State}. handle_info(_Info, State) -> @@ -166,11 +158,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> +run_bindings(QPids, Delivery) -> lists:foldl( fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, - Txn, Message, QPid) of + case catch rabbit_amqqueue:deliver(QPid, Delivery) of true -> {true, [QPid | Handled]}; false -> {true, Handled}; {'EXIT', _Reason} -> {Routed, Handled} |