diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-04 10:38:30 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-04 10:38:30 +0100 |
commit | 1d167ee737a77aab2ebe9b0edd983a9220ea5a69 (patch) | |
tree | 2b96616c3b8a71a5c04d41e7856087de1eaa2b00 /src | |
parent | bdd633b93d2b1741b28632c88078b2d80a4df247 (diff) | |
download | rabbitmq-server-1d167ee737a77aab2ebe9b0edd983a9220ea5a69.tar.gz |
change tx semantics to 'batching'
We keep track of uncommitted messages and acks in the channel. All
routing decisions are made instantly, which means errors are detected
straight away.
We increment pub/ack stats on commit only.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 100 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 20 |
3 files changed, 88 insertions, 35 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index ec8ed351..9cc406e7 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -170,7 +170,8 @@ publish(XName, RKey, Mandatory, Immediate, Props, Body) -> end. publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {RoutingRes, DeliveredQPids} = + rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), {ok, RoutingRes, DeliveredQPids}. is_message_persistent(#content{properties = #'P_basic'{ diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f0f8c4dd..df337aef 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,7 +30,8 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, next_tag, unacked_message_q, + limiter_pid, start_limiter_fun, tx_enabled, next_tag, + unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -40,10 +41,13 @@ -define(STATISTICS_KEYS, [pid, + transactional, confirm, consumer_count, messages_unacknowledged, messages_unconfirmed, + messages_uncommitted, + acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -170,8 +174,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, + tx_enabled = false, next_tag = 1, unacked_message_q = queue:new(), + uncommitted_message_q = queue:new(), + uncommitted_ack_q = queue:new(), user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -595,6 +602,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, + tx_enabled = TxEnabled, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -614,16 +622,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, rabbit_basic:delivery(Mandatory, Immediate, Message, - MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, - ExchangeName, MsgSeqNo, Message, - State1), - maybe_incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State2), + Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, + MsgSeqNo), + QNames = rabbit_exchange:route(Exchange, Delivery), + State2 = case TxEnabled of + true -> TMQ = State1#ch.uncommitted_message_q, + NewTMQ = queue:in({Delivery, QNames}, TMQ), + State1#ch{uncommitted_message_q = NewTMQ}; + false -> deliver_to_queues({Delivery, QNames}, State1) + end, {noreply, State2}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, @@ -638,12 +645,16 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ, + tx_enabled = TxEnabled}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - QIncs = ack(Acked), - maybe_incr_stats(QIncs, ack, State), - ok = notify_limiter(State#ch.limiter_pid, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}; + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, case TxEnabled of + true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, + Acked), + State1#ch{uncommitted_ack_q = NewTAQ}; + false -> ack(Acked, State1) + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -1024,6 +1035,26 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); +handle_method(#'tx.select'{}, _, State) -> + {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}}; + +handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) -> + rabbit_misc:protocol_error( + precondition_failed, "channel is not transactional", []); + +handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, + uncommitted_ack_q = TAQ}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))}; + +handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) -> + rabbit_misc:protocol_error( + precondition_failed, "channel is not transactional", []); + +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_ack_q = TAQ}) -> + {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = + queue:join(TAQ, UAMQ)})}; handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1200,11 +1231,18 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) end. -ack(UAQ) -> - fold_per_queue(fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{QPid, length(MsgIds)} | L] - end, [], UAQ). +ack(Acked, State) -> + QIncs = fold_per_queue( + fun (QPid, MsgIds, L) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + [{QPid, length(MsgIds)} | L] + end, [], Acked), + maybe_incr_stats(QIncs, ack, State), + ok = notify_limiter(State#ch.limiter_pid, Acked), + State. + +new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), + uncommitted_ack_q = queue:new()}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1255,6 +1293,18 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. +deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ + exchange_name = XName}, + msg_seq_no = MsgSeqNo}, + QNames}, State) -> + {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State), + maybe_incr_stats([{XName, 1} | + [{{QPid, XName}, 1} || + QPid <- DeliveredQPids]], publish, State1), + State1. + process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], @@ -1262,8 +1312,7 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_not_delivered, State), + maybe_incr_stats([{XName, 1}], return_not_delivered, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1343,6 +1392,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{tx_enabled = TE}) -> TE; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); @@ -1350,6 +1400,10 @@ i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> + queue:len(TMQ); +i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> + queue:len(TAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index cab1b99f..cecd879a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -22,7 +22,7 @@ assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, - publish/2, delete/2]). + route/2, delete/2]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/1, serial/1, peek_serial/1]). @@ -66,8 +66,8 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> [rabbit_amqqueue:name()]). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | @@ -224,21 +224,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X = #exchange{name = XName}, Delivery) -> - rabbit_router:deliver( - route(Delivery, {queue:from_list([X]), XName, []}), - Delivery). +route(X = #exchange{name = XName}, Delivery) -> + route1(Delivery, {queue:from_list([X]), XName, []}). -route(Delivery, {WorkList, SeenXs, QNames}) -> +route1(Delivery, {WorkList, SeenXs, QNames}) -> case queue:out(WorkList) of {empty, _WorkList} -> lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> DstNames = process_alternate( X, ((type_to_module(Type)):route(X, Delivery))), - route(Delivery, - lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, - DstNames)) + route1(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. process_alternate(#exchange{name = XName, arguments = Args}, []) -> |