summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-14 11:48:37 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-14 11:48:37 +0100
commit57b78e60c441714d48f30a1adfaea7011a756eb1 (patch)
tree61d93a2aa8a87e5cf7d07777135b9e023a63c34f /src/rabbit_amqqueue_process.erl
parent6d6fb807e4a6b57dee8a72e650fa550d3ed23a4c (diff)
parentcbcc208629db5bac9d26862a481aa079d8e89478 (diff)
downloadrabbitmq-server-57b78e60c441714d48f30a1adfaea7011a756eb1.tar.gz
Merging default to bug24130
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl127
1 files changed, 20 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cb8a485e..9fe13d0d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -62,7 +62,6 @@
monitor_ref,
acktags,
is_limit_active,
- txn,
unsent_message_count}).
-define(STATISTICS_KEYS,
@@ -193,14 +192,7 @@ bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
fun (Mod, Fun) ->
- rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
- end,
- fun (Mod, Fun) ->
- rabbit_misc:with_exit_handler(
- fun () -> error end,
- fun () ->
- rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
- end)
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
@@ -217,22 +209,14 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- BQS1 = lists:foldl(
- fun (#cr{txn = none}, BQSN) ->
- BQSN;
- (#cr{txn = Txn}, BQSN) ->
- {_AckTags, BQSN1} =
- BQ:tx_rollback(Txn, BQSN),
- BQSN1
- end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- State1#q{backing_queue_state = Fun(BQS1)}
+ State1#q{backing_queue_state = Fun(BQS)}
end.
reply(Reply, NewState) ->
@@ -343,7 +327,6 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
acktags = sets:new(),
is_limit_active = false,
- txn = none,
unsent_message_count = 0},
put(Key, C),
C;
@@ -355,13 +338,12 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
- txn = Txn,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of
- {0, 0, 0, none} -> ok = erase_ch_record(C),
- false;
- _ -> store_ch_record(C),
- true
+ case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {0, 0, 0} -> ok = erase_ch_record(C),
+ false;
+ _ -> store_ch_record(C),
+ true
end.
erase_ch_record(#cr{ch_pid = ChPid,
@@ -513,8 +495,7 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(Delivery = #delivery{txn = none,
- sender = ChPid,
+attempt_delivery(Delivery = #delivery{sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -523,7 +504,7 @@ attempt_delivery(Delivery = #delivery{txn = none,
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- case BQ:is_duplicate(none, Message, BQS) of
+ case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -555,24 +536,6 @@ attempt_delivery(Delivery = #delivery{txn = none,
discarded -> false
end,
{Delivered, Confirm, State#q{backing_queue_state = BQS1}}
- end;
-attempt_delivery(Delivery = #delivery{txn = Txn,
- sender = ChPid,
- message = Message},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case BQ:is_duplicate(Txn, Message, BQS) of
- {false, BQS1} ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
- BQS1),
- {true, Confirm, State#q{backing_queue_state = BQS2}};
- {Duplicate, BQS1} ->
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
@@ -652,7 +615,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -665,13 +628,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
ChPid, State#q.blocked_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> State2 = case Txn of
- none -> State1;
- _ -> rollback_transaction(Txn, C,
- State1)
- end,
- {ok, requeue_and_run(sets:to_list(ChAckTags),
- ensure_expiry_timer(State2))}
+ false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ ensure_expiry_timer(State1))}
end
end.
@@ -705,25 +663,6 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(
- Txn, fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL), BQS),
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
- State#q{backing_queue_state = BQS1}.
-
-rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- %% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here.
- maybe_store_ch_record(C#cr{txn = none}),
- State#q{backing_queue_state = BQS1}.
-
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
@@ -860,7 +799,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- {run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.
@@ -873,7 +811,7 @@ prioritise_cast(Msg, _State) ->
maybe_expire -> 8;
drop_expired -> 8;
emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
@@ -945,13 +883,6 @@ handle_call({deliver, Delivery}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
-handle_call({commit, Txn, ChPid}, From, State) ->
- case lookup_ch(ChPid) of
- not_found -> reply(ok, State);
- C -> noreply(run_message_queue(
- commit_transaction(Txn, From, C, State)))
- end;
-
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
@@ -1091,11 +1022,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
- end;
-
-handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Mod, Fun, State)).
-
+ end.
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1107,24 +1034,16 @@ handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
-handle_cast({ack, Txn, AckTags, ChPid},
+handle_cast({ack, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, State1} =
- case Txn of
- none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- NewC = C#cr{acktags = ChAckTags1},
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- {NewC, State#q{backing_queue_state = BQS1}};
- _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
- {C#cr{txn = Txn},
- State#q{backing_queue_state = BQS1}}
- end,
- maybe_store_ch_record(C1),
- noreply(State1)
+ maybe_store_ch_record(C#cr{acktags = subtract_acks(
+ ChAckTags, AckTags)}),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ noreply(State#q{backing_queue_state = BQS1})
end;
handle_cast({reject, AckTags, Requeue, ChPid},
@@ -1143,12 +1062,6 @@ handle_cast({reject, AckTags, Requeue, ChPid},
end)
end;
-handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(case lookup_ch(ChPid) of
- not_found -> State;
- C -> rollback_transaction(Txn, C, State)
- end);
-
handle_cast(delete_immediately, State) ->
{stop, normal, State};