summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-07 11:53:54 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-07 11:53:54 +0000
commitcebfaf63a18858987bd19b0d2dbfa11642392c69 (patch)
tree5d474874917121bfdd5540a764e72e3acbd9e1d6
parent0860b908377d89725fc366095e6c273eae2d691a (diff)
downloadrabbitmq-server-cebfaf63a18858987bd19b0d2dbfa11642392c69.tar.gz
get rid of #tx{}
and fix uncommitted_acks info item
-rw-r--r--src/rabbit_channel.erl77
1 files changed, 38 insertions, 39 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cccd09dd..df056a6e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,17 +40,6 @@
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
-
--record(tx, {msgs, acks}). %% (1)
-%% (1) acks looks s.t. like this:
-%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
-%%
-%% Each element is a pair consisting of a tag and a list of
-%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
-%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
-%% well as the list overall, are in "most-recent (generally youngest)
-%% ack first" order.
-
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -631,12 +620,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
- {noreply,
- case Tx of
- none -> deliver_to_queues(DQ, State1);
- #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs),
- State1#ch{tx = Tx#tx{msgs = Msgs1}}
- end};
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
@@ -652,13 +640,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case Tx of
- none -> ack(Acked, State1),
- State1;
- #tx{acks = Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
- State1#ch{tx = Tx#tx{acks = Acks1}}
- end};
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1042,8 +1029,7 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
- acks = Acks},
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
lists:foreach(
@@ -1056,13 +1042,13 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- tx = #tx{acks = Acks}}) ->
+ tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
-handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) ->
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1213,13 +1199,12 @@ reject(DeliveryTag, Requeue, Multiple,
State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case Tx of
- none -> reject(Requeue, Acked, State1#ch.limiter),
- State1;
- #tx{acks = Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
- State1#ch{tx = Tx#tx{acks = Acks1}}
- end}.
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
@@ -1294,7 +1279,19 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
end, Acked),
ok = notify_limiter(State#ch.limiter, Acked).
-new_tx() -> #tx{msgs = queue:new(), acks = []}.
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1457,7 +1454,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
-maybe_complete_tx(State = #ch{tx = #tx{}}) ->
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1489,9 +1488,9 @@ i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
-i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks);
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);