summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-07-04 10:38:30 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-07-04 10:38:30 +0100
commit1d167ee737a77aab2ebe9b0edd983a9220ea5a69 (patch)
tree2b96616c3b8a71a5c04d41e7856087de1eaa2b00 /src
parentbdd633b93d2b1741b28632c88078b2d80a4df247 (diff)
downloadrabbitmq-server-1d167ee737a77aab2ebe9b0edd983a9220ea5a69.tar.gz
change tx semantics to 'batching'
We keep track of uncommitted messages and acks in the channel. All routing decisions are made instantly, which means errors are detected straight away. We increment pub/ack stats on commit only.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_channel.erl100
-rw-r--r--src/rabbit_exchange.erl20
3 files changed, 88 insertions, 35 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index ec8ed351..9cc406e7 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -170,7 +170,8 @@ publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
{ok, RoutingRes, DeliveredQPids}.
is_message_persistent(#content{properties = #'P_basic'{
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f0f8c4dd..df337aef 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,7 +30,8 @@
prioritise_cast/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, next_tag, unacked_message_q,
+ limiter_pid, start_limiter_fun, tx_enabled, next_tag,
+ unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -40,10 +41,13 @@
-define(STATISTICS_KEYS,
[pid,
+ transactional,
confirm,
consumer_count,
messages_unacknowledged,
messages_unconfirmed,
+ messages_uncommitted,
+ acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -170,8 +174,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
+ tx_enabled = false,
next_tag = 1,
unacked_message_q = queue:new(),
+ uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -595,6 +602,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
+ tx_enabled = TxEnabled,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -614,16 +622,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange, rabbit_basic:delivery(Mandatory, Immediate, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- ExchangeName, MsgSeqNo, Message,
- State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
+ Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
+ MsgSeqNo),
+ QNames = rabbit_exchange:route(Exchange, Delivery),
+ State2 = case TxEnabled of
+ true -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ};
+ false -> deliver_to_queues({Delivery, QNames}, State1)
+ end,
{noreply, State2};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
@@ -638,12 +645,16 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ tx_enabled = TxEnabled}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QIncs = ack(Acked),
- maybe_incr_stats(QIncs, ack, State),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply, case TxEnabled of
+ true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
+ Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ};
+ false -> ack(Acked, State1)
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1024,6 +1035,26 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+handle_method(#'tx.select'{}, _, State) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}};
+
+handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "channel is not transactional", []);
+
+handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_ack_q = TAQ}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "channel is not transactional", []);
+
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = TAQ}) ->
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1200,11 +1231,18 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
-ack(UAQ) ->
- fold_per_queue(fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], UAQ).
+ack(Acked, State) ->
+ QIncs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{QPid, length(MsgIds)} | L]
+ end, [], Acked),
+ maybe_incr_stats(QIncs, ack, State),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State.
+
+new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new()}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1255,6 +1293,18 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
+deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
+ exchange_name = XName},
+ msg_seq_no = MsgSeqNo},
+ QNames}, State) ->
+ {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State),
+ maybe_incr_stats([{XName, 1} |
+ [{{QPid, XName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State1),
+ State1.
+
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
@@ -1262,8 +1312,7 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_not_delivered, State),
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1343,6 +1392,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{tx_enabled = TE}) -> TE;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
@@ -1350,6 +1400,10 @@ i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
+i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
+ queue:len(TMQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
+ queue:len(TAQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index cab1b99f..cecd879a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- publish/2, delete/2]).
+ route/2, delete/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
@@ -66,8 +66,8 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
--spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> [rabbit_amqqueue:name()]).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
@@ -224,21 +224,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X = #exchange{name = XName}, Delivery) ->
- rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), XName, []}),
- Delivery).
+route(X = #exchange{name = XName}, Delivery) ->
+ route1(Delivery, {queue:from_list([X]), XName, []}).
-route(Delivery, {WorkList, SeenXs, QNames}) ->
+route1(Delivery, {WorkList, SeenXs, QNames}) ->
case queue:out(WorkList) of
{empty, _WorkList} ->
lists:usort(QNames);
{{value, X = #exchange{type = Type}}, WorkList1} ->
DstNames = process_alternate(
X, ((type_to_module(Type)):route(X, Delivery))),
- route(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
+ DstNames))
end.
process_alternate(#exchange{name = XName, arguments = Args}, []) ->