summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-06-26 18:21:18 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-06-26 18:21:18 +0100
commited7c2e337d8ce533ad256a841fb90c3e045c5d61 (patch)
treec60c02ba75134c0b8b5639c56e12c93488f3f508
parent398ae196b2376f17d161a1e8fc5775ddaedb5493 (diff)
parent907bdbfb4bd9a099882379cfd09e31f2672d772b (diff)
downloadrabbitmq-server-ed7c2e337d8ce533ad256a841fb90c3e045c5d61.tar.gz
merge default into bug24216
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--include/rabbit.hrl3
-rw-r--r--include/rabbit_backing_queue_spec.hrl17
-rw-r--r--src/rabbit_amqqueue.erl32
-rw-r--r--src/rabbit_amqqueue_process.erl127
-rw-r--r--src/rabbit_backing_queue.erl21
-rw-r--r--src/rabbit_basic.erl32
-rw-r--r--src/rabbit_channel.erl141
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl41
-rw-r--r--src/rabbit_mirror_queue_master.erl34
-rw-r--r--src/rabbit_mirror_queue_slave.erl37
-rw-r--r--src/rabbit_queue_index.erl18
-rw-r--r--src/rabbit_tests.erl49
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_variable_queue.erl237
17 files changed, 153 insertions, 658 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a0f03192..fdb49912 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1196,10 +1196,6 @@
<listitem><para>Virtual host in which the channel operates.</para></listitem>
</varlistentry>
<varlistentry>
- <term>transactional</term>
- <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>consumer_count</term>
<listitem><para>Number of logical AMQP consumers retrieving messages via
the channel.</para></listitem>
@@ -1210,11 +1206,6 @@
yet acknowledged.</para></listitem>
</varlistentry>
<varlistentry>
- <term>acks_uncommitted</term>
- <listitem><para>Number of acknowledgements received in an as yet
- uncommitted transaction.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
@@ -1239,8 +1230,7 @@
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
- user, transactional, consumer_count, and
- messages_unacknowledged are assumed.
+ user, consumer_count, and messages_unacknowledged are assumed.
</para>
<para role="example-prefix">
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 00b7e6e9..3861df2a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -67,8 +67,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message,
- msg_seq_no}).
+-record(delivery, {mandatory, immediate, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 295d9039..ee102f5e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -26,12 +26,11 @@
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
--type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
- async_callback(), sync_callback()) -> state()).
+-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback()) -> state()).
-spec(terminate/2 :: (any(), state()) -> state()).
-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
@@ -51,14 +50,6 @@
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state()).
--spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
--spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/4 ::
- (rabbit_types:txn(), fun (() -> any()),
- message_properties_transformer(), state()) -> {[ack()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
@@ -71,7 +62,7 @@
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/3 ::
- (rabbit_types:txn(), rabbit_types:basic_message(), state()) ->
+-spec(is_duplicate/2 ::
+ (rabbit_types:basic_message(), state()) ->
{'false'|'published'|'discarded', state()}).
-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index bacb1d21..e9d01d12 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,19 +20,18 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4, reject/4]).
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
--export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
+-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
%% internal
--export([internal_declare/2, internal_delete/1,
- run_backing_queue/3, run_backing_queue_async/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -117,12 +116,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/4 ::
- (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
- -> 'ok').
+-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
@@ -147,9 +142,6 @@
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue_async/3 ::
- (pid(), atom(),
- (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -436,21 +428,12 @@ deliver(QPid, Delivery) ->
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) ->
+ delegate_cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
-commit_all(QPids, Txn, ChPid) ->
- safe_delegate_call_ok(
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
- QPids).
-
-rollback_all(QPids, Txn, ChPid) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
-
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
@@ -501,9 +484,6 @@ internal_delete(QueueName) ->
end).
run_backing_queue(QPid, Mod, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-
-run_backing_queue_async(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c1fa048d..3e2bbf8d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -62,7 +62,6 @@
monitor_ref,
acktags,
is_limit_active,
- txn,
unsent_message_count}).
-define(STATISTICS_KEYS,
@@ -193,14 +192,7 @@ bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
fun (Mod, Fun) ->
- rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
- end,
- fun (Mod, Fun) ->
- rabbit_misc:with_exit_handler(
- fun () -> error end,
- fun () ->
- rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
- end)
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
@@ -217,22 +209,14 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
case BQS of
undefined -> State;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- BQS1 = lists:foldl(
- fun (#cr{txn = none}, BQSN) ->
- BQSN;
- (#cr{txn = Txn}, BQSN) ->
- {_AckTags, BQSN1} =
- BQ:tx_rollback(Txn, BQSN),
- BQSN1
- end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- State1#q{backing_queue_state = Fun(BQS1)}
+ State1#q{backing_queue_state = Fun(BQS)}
end.
reply(Reply, NewState) ->
@@ -343,7 +327,6 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
acktags = sets:new(),
is_limit_active = false,
- txn = none,
unsent_message_count = 0},
put(Key, C),
C;
@@ -355,13 +338,12 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
- txn = Txn,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of
- {0, 0, 0, none} -> ok = erase_ch_record(C),
- false;
- _ -> store_ch_record(C),
- true
+ case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {0, 0, 0} -> ok = erase_ch_record(C),
+ false;
+ _ -> store_ch_record(C),
+ true
end.
erase_ch_record(#cr{ch_pid = ChPid,
@@ -513,8 +495,7 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(Delivery = #delivery{txn = none,
- sender = ChPid,
+attempt_delivery(Delivery = #delivery{sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -523,7 +504,7 @@ attempt_delivery(Delivery = #delivery{txn = none,
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- case BQ:is_duplicate(none, Message, BQS) of
+ case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -555,24 +536,6 @@ attempt_delivery(Delivery = #delivery{txn = none,
discarded -> false
end,
{Delivered, Confirm, State#q{backing_queue_state = BQS1}}
- end;
-attempt_delivery(Delivery = #delivery{txn = Txn,
- sender = ChPid,
- message = Message},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case BQ:is_duplicate(Txn, Message, BQS) of
- {false, BQS1} ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
- BQS1),
- {true, Confirm, State#q{backing_queue_state = BQS2}};
- {Duplicate, BQS1} ->
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
@@ -652,7 +615,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -665,13 +628,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
ChPid, State#q.blocked_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> State2 = case Txn of
- none -> State1;
- _ -> rollback_transaction(Txn, C,
- State1)
- end,
- {ok, requeue_and_run(sets:to_list(ChAckTags),
- ensure_expiry_timer(State2))}
+ false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ ensure_expiry_timer(State1))}
end
end.
@@ -705,25 +663,6 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(
- Txn, fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL), BQS),
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
- State#q{backing_queue_state = BQS1}.
-
-rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- %% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here.
- maybe_store_ch_record(C#cr{txn = none}),
- State#q{backing_queue_state = BQS1}.
-
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
@@ -848,7 +787,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- {run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.
@@ -861,7 +799,7 @@ prioritise_cast(Msg, _State) ->
maybe_expire -> 8;
drop_expired -> 8;
emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
@@ -933,13 +871,6 @@ handle_call({deliver, Delivery}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
-handle_call({commit, Txn, ChPid}, From, State) ->
- case lookup_ch(ChPid) of
- not_found -> reply(ok, State);
- C -> noreply(run_message_queue(
- commit_transaction(Txn, From, C, State)))
- end;
-
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
@@ -1079,11 +1010,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
- end;
-
-handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Mod, Fun, State)).
-
+ end.
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1095,24 +1022,16 @@ handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
-handle_cast({ack, Txn, AckTags, ChPid},
+handle_cast({ack, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, State1} =
- case Txn of
- none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- NewC = C#cr{acktags = ChAckTags1},
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- {NewC, State#q{backing_queue_state = BQS1}};
- _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
- {C#cr{txn = Txn},
- State#q{backing_queue_state = BQS1}}
- end,
- maybe_store_ch_record(C1),
- noreply(State1)
+ maybe_store_ch_record(C#cr{acktags = subtract_acks(
+ ChAckTags, AckTags)}),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ noreply(State#q{backing_queue_state = BQS1})
end;
handle_cast({reject, AckTags, Requeue, ChPid},
@@ -1131,12 +1050,6 @@ handle_cast({reject, AckTags, Requeue, ChPid},
end)
end;
-handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(case lookup_ch(ChPid) of
- not_found -> State;
- C -> rollback_transaction(Txn, C, State)
- end);
-
handle_cast(delete_immediately, State) ->
{stop, normal, State};
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 217ad3eb..77278416 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -44,9 +44,7 @@ behaviour_info(callbacks) ->
%% makes it useful for passing messages back into the backing
%% queue, especially as the backing queue does not have
%% control of its own mailbox.
- %% 4. a synchronous callback. Same as the asynchronous callback
- %% but waits for completion and returns 'error' on error.
- {init, 4},
+ {init, 3},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 2},
@@ -107,21 +105,6 @@ behaviour_info(callbacks) ->
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
{ack, 2},
- %% A publish, but in the context of a transaction.
- {tx_publish, 5},
-
- %% Acks, but in the context of a transaction.
- {tx_ack, 3},
-
- %% Undo anything which has been done in the context of the
- %% specified transaction.
- {tx_rollback, 2},
-
- %% Commit a transaction. The Fun passed in must be called once
- %% the messages have really been commited. This CPS permits the
- %% possibility of commit coalescing.
- {tx_commit, 4},
-
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
{requeue, 3},
@@ -175,7 +158,7 @@ behaviour_info(callbacks) ->
%% the BQ to signal that it's already seen this message (and in
%% what capacity - i.e. was it published previously or discarded
%% previously) and thus the message should be dropped.
- {is_duplicate, 3},
+ {is_duplicate, 2},
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ for some
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index fa7e3a5a..ec8ed351 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/5]).
--export([publish/4, publish/7]).
+-export([publish/1, message/3, message/4, properties/1, delivery/4]).
+-export([publish/4, publish/6]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -37,9 +37,8 @@
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/5 ::
- (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -53,10 +52,9 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/7 ::
+-spec(publish/6 ::
(exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- rabbit_types:maybe(rabbit_types:txn()), properties_input(),
- body_input()) -> publish_result()).
+ properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -73,9 +71,9 @@ publish(Delivery = #delivery{
Other -> Other
end.
-delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
+delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+ message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
@@ -157,19 +155,17 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, none, Properties,
- Body).
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn,
- Props, Body) ->
- publish(X, delivery(Mandatory, Immediate, Txn,
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
message(XName, RKey, properties(Props), Body),
undefined));
-publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) ->
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body);
+ {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
Err -> Err
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 991b0b06..36471bf5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,8 +30,7 @@
prioritise_cast/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, transaction_id, tx_participants,
- next_tag, uncommitted_ack_q, unacked_message_q,
+ limiter_pid, start_limiter_fun, next_tag, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -41,12 +40,10 @@
-define(STATISTICS_KEYS,
[pid,
- transactional,
confirm,
consumer_count,
messages_unacknowledged,
messages_unconfirmed,
- acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -173,10 +170,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
- transaction_id = none,
- tx_participants = sets:new(),
next_tag = 1,
- uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
user = User,
virtual_host = VHost,
@@ -331,7 +325,7 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
{hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(Reason, State) ->
- {Res, _State1} = rollback_and_notify(State),
+ {Res, _State1} = notify_queues(State),
case Reason of
normal -> ok = Res;
shutdown -> ok = Res;
@@ -386,8 +380,8 @@ send_exception(Reason, State = #ch{protocol = Protocol,
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
[ConnPid, Channel, Reason]),
- %% something bad's happened: rollback_and_notify may not be 'ok'
- {_Result, State1} = rollback_and_notify(State),
+ %% something bad's happened: notify_queues may not be 'ok'
+ {_Result, State1} = notify_queues(State),
case CloseChannel of
Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
@@ -589,7 +583,7 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
{noreply, State};
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
- {ok, State1} = rollback_and_notify(State),
+ {ok, State1} = notify_queues(State),
ReaderPid ! {channel_closing, self()},
{noreply, State1};
@@ -601,7 +595,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -623,19 +616,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_trace:tap_trace_in(Message, TraceState),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
+ Exchange, rabbit_basic:delivery(Mandatory, Immediate, Message,
+ MsgSeqNo)),
State2 = process_routing_result(RoutingRes, DeliveredQPids,
ExchangeName, MsgSeqNo, Message,
State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ {noreply, State2};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
"invalid message: ~p", [Reason])
@@ -649,22 +638,12 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{transaction_id = TxnKey,
- unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QIncs = ack(TxnKey, Acked),
- Participants = [QPid || {QPid, _} <- QIncs],
+ QIncs = ack(Acked),
maybe_incr_stats(QIncs, ack, State),
- {noreply, case TxnKey of
- none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
- State#ch{unacked_message_q = Remaining};
- _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
- Acked),
- add_tx_participants(
- Participants,
- State#ch{unacked_message_q = Remaining,
- uncommitted_ack_q = NewUAQ})
- end};
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1048,35 +1027,6 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from confirm to tx mode", []);
-
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
- {reply, #'tx.select_ok'{}, new_tx(State)};
-
-handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State};
-
-handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
-
-handle_method(#'tx.commit'{}, _, State) ->
- {reply, #'tx.commit_ok'{}, internal_commit(State)};
-
-handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
-
-handle_method(#'tx.rollback'{}, _, State) ->
- {reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-
-handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
- when TxId =/= none ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from tx to confirm mode", []);
-
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
@@ -1252,55 +1202,17 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
-add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
- State#ch{tx_participants = sets:union(Participants,
- sets:from_list(MoreP))}.
-
-ack(TxnKey, UAQ) ->
- fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], UAQ).
-
-make_tx_id() -> rabbit_guid:guid().
-
-new_tx(State) ->
- State#ch{transaction_id = make_tx_id(),
- tx_participants = sets:new(),
- uncommitted_ack_q = queue:new()}.
-
-internal_commit(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants}) ->
- case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey, self()) of
- ok -> ok = notify_limiter(State#ch.limiter_pid,
- State#ch.uncommitted_ack_q),
- new_tx(State);
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "commit failed: ~w", [Errors])
- end.
+ack(UAQ) ->
+ fold_per_queue(fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{QPid, length(MsgIds)} | L]
+ end, [], UAQ).
-internal_rollback(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants,
- uncommitted_ack_q = UAQ,
- unacked_message_q = UAMQ}) ->
- ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n",
- [self(),
- queue:len(UAQ),
- queue:len(UAMQ)]),
- ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey, self()),
- NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ}).
-
-rollback_and_notify(State = #ch{state = closing}) ->
+notify_queues(State = #ch{state = closing}) ->
{ok, State};
-rollback_and_notify(State = #ch{transaction_id = none}) ->
- {notify_queues(State), State#ch{state = closing}};
-rollback_and_notify(State) ->
- State1 = internal_rollback(State),
- {notify_queues(State1), State1#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers}) ->
+ {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
+ State#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
@@ -1319,9 +1231,6 @@ start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
ok = limit_queues(LPid, State),
LPid.
-notify_queues(#ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
-
unlimit_queues(State) ->
ok = limit_queues(undefined, State),
undefined.
@@ -1436,17 +1345,13 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
gb_trees:size(UMQ);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = UAQ}) ->
- queue:len(UAMQ) + queue:len(UAQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
- queue:len(UAQ);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
+ queue:len(UAMQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 9eef384a..6eb1aaba 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -301,7 +301,7 @@ action(list_connections, Node, Args, _Opts, Inform) ->
action(list_channels, Node, Args, _Opts, Inform) ->
Inform("Listing channels", []),
- ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
+ ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 3fb0817a..93aad9e3 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -71,7 +71,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(LogExch, RoutingKey, false, false, none,
+ rabbit_basic:publish(LogExch, RoutingKey, false, false,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2727c1d0..4906937b 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -70,7 +70,7 @@
%% group. Because the master is the bq of amqqueue_process, it doesn't
%% have sole control over its mailbox, and as a result, the master
%% itself cannot be passed messages directly (well, it could by via
-%% the amqqueue:run_backing_queue_async callback but that would induce
+%% the amqqueue:run_backing_queue callback but that would induce
%% additional unnecessary loading on the master queue process), yet it
%% needs to react to gm events, such as the death of slaves. Thus the
%% master creates the coordinator, and it is the coordinator that is
@@ -254,45 +254,6 @@
%% sender_death message. The slave will then be able to tidy up its
%% state as normal.
%%
-%% We don't support transactions on mirror queues. To do so is
-%% challenging. The underlying bq is free to add the contents of the
-%% txn to the queue proper at any point after the tx.commit comes in
-%% but before the tx.commit-ok goes out. This means that it is not
-%% safe for all mirrors to simply issue the bq:tx_commit at the same
-%% time, as the addition of the txn's contents to the queue may
-%% subsequently be inconsistently interwoven with other actions on the
-%% bq. The solution to this is, in the master, wrap the PostCommitFun
-%% and do the gm:broadcast in there: at that point, you're in the bq
-%% (well, there's actually nothing to stop that function being invoked
-%% by some other process, but let's pretend for now: you could always
-%% use run_backing_queue to ensure you really are in the queue process
-%% (the _async variant would be unsafe from an ordering pov)), the
-%% gm:broadcast is safe because you don't have to worry about races
-%% with other gm:broadcast calls (same process). Thus this signal
-%% would indicate sufficiently to all the slaves that they must insert
-%% the complete contents of the txn at precisely this point in the
-%% stream of events.
-%%
-%% However, it's quite difficult for the slaves to make that happen:
-%% they would be forced to issue the bq:tx_commit at that point, but
-%% then stall processing any further instructions from gm until they
-%% receive the notification from their bq that the tx_commit has fully
-%% completed (i.e. they need to treat what is an async system as being
-%% fully synchronous). This is not too bad (apart from the
-%% vomit-inducing notion of it all): just need a queue of instructions
-%% from the GM; but then it gets rather worse when you consider what
-%% needs to happen if the master dies at this point and the slave in
-%% the middle of this tx_commit needs to be promoted.
-%%
-%% Finally, we can't possibly hope to make transactions atomic across
-%% mirror queues, and it's not even clear that that's desirable: if a
-%% slave fails whilst there's an open transaction in progress then
-%% when the channel comes to commit the txn, it will detect the
-%% failure and destroy the channel. However, the txn will have
-%% actually committed successfully in all the other mirrors (including
-%% master). To do this bit properly would require 2PC and all the
-%% baggage that goes with that.
-%%
%% Recovery of mirrored queues is straightforward: as nodes die, the
%% remaining nodes record this, and eventually a situation is reached
%% in which only one node is alive, which is the master. This is the
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 463b8cfb..9e0ffb13 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -16,13 +16,12 @@
-module(rabbit_mirror_queue_master).
--export([init/4, terminate/2, delete_and_terminate/2,
+-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/3, discard/3]).
+ status/1, invoke/3, is_duplicate/2, discard/3]).
-export([start/1, stop/0]).
@@ -62,7 +61,7 @@ stop() ->
sender_death_fun() ->
Self = self(),
fun (DeadPid) ->
- rabbit_amqqueue:run_backing_queue_async(
+ rabbit_amqqueue:run_backing_queue(
Self, ?MODULE,
fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
ok = gm:broadcast(GM, {sender_death, DeadPid}),
@@ -72,7 +71,7 @@ sender_death_fun() ->
end.
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
- AsyncCallback, SyncCallback) ->
+ AsyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
@@ -84,7 +83,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
+ BQS = BQ:init(Q, Recover, AsyncCallback),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -243,21 +242,6 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) ->
- %% We don't support txns in mirror queues
- State.
-
-tx_ack(_Txn, _AckTags, State) ->
- %% We don't support txns in mirror queues
- State.
-
-tx_rollback(_Txn, State) ->
- {[], State}.
-
-tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) ->
- PostCommitFun(), %% Probably must run it to avoid deadlocks
- {[], State}.
-
requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -299,7 +283,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-is_duplicate(none, Message = #basic_message { id = MsgId },
+is_duplicate(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -341,11 +325,7 @@ is_duplicate(none, Message = #basic_message { id = MsgId },
%% Don't erase from SS here because discard/2 is about to
%% be called and we need to be able to detect this case
{discarded, State}
- end;
-is_duplicate(_Txn, _Msg, State) ->
- %% In a transaction. We don't support txns in mirror queues. But
- %% it's probably not a duplicate...
- {false, State}.
+ end.
discard(Msg = #basic_message { id = MsgId }, ChPid,
State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 55d61d41..93340ba8 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -167,14 +167,7 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State}
- end;
-
-handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Mod, Fun, State));
-
-handle_call({commit, _Txn, _ChPid}, _From, State) ->
- %% We don't support transactions in mirror queues
- reply(ok, State).
+ end.
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -208,11 +201,7 @@ handle_cast(update_ram_duration,
handle_cast(sync_timeout, State) ->
noreply(backing_queue_timeout(
- State #state { sync_timer_ref = undefined }));
-
-handle_cast({rollback, _Txn, _ChPid}, State) ->
- %% We don't support transactions in mirror queues
- noreply(State).
+ State #state { sync_timer_ref = undefined })).
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
@@ -271,7 +260,6 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
- {run_backing_queue, _Mod, _Fun} -> 6;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
@@ -331,14 +319,7 @@ bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
fun (Mod, Fun) ->
- rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
- end,
- fun (Mod, Fun) ->
- rabbit_misc:with_exit_handler(
- fun () -> error end,
- fun () ->
- rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
- end)
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
@@ -488,7 +469,7 @@ promote_me(From, #state { q = Q,
%%
%% Everything that's in MA gets requeued. Consequently the new
%% master should start with a fresh AM as there are no messages
- %% pending acks (txns will have been rolled back).
+ %% pending acks.
MSList = dict:to_list(MS),
SS = dict:from_list(
@@ -605,15 +586,14 @@ confirm_sender_death(Pid) ->
%% Note that we do not remove our knowledge of this ChPid until we
%% get the sender_death from GM.
{ok, _TRef} = timer:apply_after(
- ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async,
+ ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
[self(), rabbit_mirror_queue_master, Fun]),
ok.
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
- sender = ChPid,
- txn = none },
+ sender = ChPid },
EnqueueOnPromotion,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
@@ -655,10 +635,7 @@ maybe_enqueue_message(
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
- end;
-maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) ->
- %% We don't support txns in mirror queues.
- State.
+ end.
get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index aaf3df78..bf89cdb2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -76,11 +76,10 @@
%% the segment file combined with the journal, no writing needs to be
%% done to the segment file either (in fact it is deleted if it exists
%% at all). This is safe given that the set of acks is a subset of the
-%% set of publishes. When it's necessary to sync messages because of
-%% transactions, it's only necessary to fsync on the journal: when
-%% entries are distributed from the journal to segment files, those
-%% segments appended to are fsync'd prior to the journal being
-%% truncated.
+%% set of publishes. When it is necessary to sync messages, it is
+%% sufficient to fsync on the journal: when entries are distributed
+%% from the journal to segment files, those segments appended to are
+%% fsync'd prior to the journal being truncated.
%%
%% This module is also responsible for scanning the queue index files
%% and seeding the message store on start up.
@@ -289,14 +288,13 @@ sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
sync_if([] =/= MsgIds, State).
sync(SeqIds, State) ->
- %% The SeqIds here contains the SeqId of every publish and ack in
- %% the transaction. Ideally we should go through these seqids and
- %% only sync the journal if the pubs or acks appear in the
+ %% The SeqIds here contains the SeqId of every publish and ack to
+ %% be sync'ed. Ideally we should go through these seqids and only
+ %% sync the journal if the pubs or acks appear in the
%% journal. However, this would be complex to do, and given that
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
- %% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled by sync_if anyway).
+ %% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3ee71a6d..63676fef 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -705,7 +705,6 @@ test_topic_expect_match(X, List) ->
Res = rabbit_exchange_type_topic:route(
X, #delivery{mandatory = false,
immediate = false,
- txn = none,
sender = self(),
message = Message}),
ExpectedRes = lists:map(
@@ -2084,7 +2083,7 @@ test_queue_index() ->
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1).
+ Q, Recover, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
@@ -2132,6 +2131,29 @@ with_fresh_variable_queue(Fun) ->
_ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
+publish_and_confirm(QPid, Payload, Count) ->
+ Seqs = lists:seq(1, Count),
+ [begin
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2},
+ Payload),
+ Delivery = #delivery{mandatory = false, immediate = false,
+ sender = self(), message = Msg, msg_seq_no = Seq},
+ true = rabbit_amqqueue:deliver(QPid, Delivery)
+ end || Seq <- Seqs],
+ wait_for_confirms(gb_sets:from_list(Seqs)).
+
+wait_for_confirms(Unconfirmed) ->
+ case gb_sets:is_empty(Unconfirmed) of
+ true -> ok;
+ false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
+ wait_for_confirms(
+ gb_sets:difference(Unconfirmed,
+ gb_sets:from_list(Confirmed)))
+ after 1000 -> exit(timeout_waiting_for_confirm)
+ end
+ end.
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
@@ -2325,17 +2347,10 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- TxID = rabbit_guid:guid(),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
- Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ publish_and_confirm(QPid, <<>>, Count),
+
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
receive {'DOWN', MRef, process, QPid, _Info} -> ok
@@ -2362,18 +2377,10 @@ test_variable_queue_delete_msg_store_files_callback() ->
ok = restart_msg_store_empty(),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- TxID = rabbit_guid:guid(),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- [begin
- Msg = rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, Payload),
- Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ publish_and_confirm(QPid, Payload, Count),
+
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
CountMinusOne = Count - 1,
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 03b2c9e8..2db960ac 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -20,7 +20,7 @@
-ifdef(use_specs).
--export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0,
+-export_type([maybe/1, info/0, infos/0, info_key/0, info_keys/0,
message/0, msg_id/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, message_properties/0,
@@ -73,16 +73,12 @@
-type(delivery() ::
#delivery{mandatory :: boolean(),
immediate :: boolean(),
- txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
-type(message_properties() ::
#message_properties{expiry :: pos_integer() | 'undefined',
needs_confirming :: boolean()}).
-%% this is really an abstract type, but dialyzer does not support them
--type(txn() :: rabbit_guid:guid()).
-
-type(info_key() :: atom()).
-type(info_keys() :: [info_key()]).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c6d99deb..ea72de66 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,20 +16,18 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/2, delete_and_terminate/2,
+-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2,
- tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1,
+ dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/3, discard/3,
+ status/1, invoke/3, is_duplicate/2, discard/3,
multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/2, stop_msg_store/0, init/5]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -239,12 +237,10 @@
ram_ack_index,
index_state,
msg_store_clients,
- on_sync,
durable,
transient_threshold,
async_callback,
- sync_callback,
len,
persistent_count,
@@ -285,10 +281,6 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(tx, { pending_messages, pending_acks }).
-
--record(sync, { acks_persistent, acks_all, pubs, funs }).
-
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -321,12 +313,6 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
--type(sync() :: #sync { acks_persistent :: [[seq_id()]],
- acks_all :: [[seq_id()]],
- pubs :: [{message_properties_transformer(),
- [rabbit_types:basic_message()]}],
- funs :: [fun (() -> any())] }).
-
-type(state() :: #vqstate {
q1 :: queue(),
q2 :: bpqueue:bpqueue(),
@@ -339,12 +325,10 @@
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- on_sync :: sync(),
durable :: boolean(),
transient_threshold :: non_neg_integer(),
async_callback :: async_callback(),
- sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -377,11 +361,6 @@
count = 0,
end_seq_id = Z }).
--define(BLANK_SYNC, #sync { acks_persistent = [],
- acks_all = [],
- pubs = [],
- funs = [] }).
-
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -410,17 +389,17 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback, SyncCallback) ->
- init(Queue, Recover, AsyncCallback, SyncCallback,
+init(Queue, Recover, AsyncCallback) ->
+ init(Queue, Recover, AsyncCallback,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
- AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
+ init(IsDurable, IndexState, 0, [], AsyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -429,7 +408,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
init(#amqqueue { name = QueueName, durable = true }, true,
- AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -450,14 +429,14 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(true, tx_commit_index(State)),
+ remove_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
@@ -590,59 +569,6 @@ ack(AckTags, State) ->
AckTags, State),
{MsgIds, a(State1)}.
-tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- _ChPid, State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
- Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
- #msg_status { msg_on_disk = true } =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState);
- false -> ok
- end,
- a(State).
-
-tx_ack(Txn, AckTags, State) ->
- Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- State.
-
-tx_rollback(Txn, State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
- #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
- erase_tx(Txn),
- ok = case IsDurable of
- true -> msg_store_remove(MSCState, true,
- persistent_msg_ids(Pubs));
- false -> ok
- end,
- {lists:append(AckTags), a(State)}.
-
-tx_commit(Txn, Fun, MsgPropsFun,
- State = #vqstate { durable = IsDurable,
- async_callback = AsyncCallback,
- sync_callback = SyncCallback,
- msg_store_clients = MSCState }) ->
- #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
- erase_tx(Txn),
- AckTags1 = lists:append(AckTags),
- PersistentMsgIds = persistent_msg_ids(Pubs),
- HasPersistentPubs = PersistentMsgIds =/= [],
- {AckTags1,
- a(case IsDurable andalso HasPersistentPubs of
- true -> MsgStoreCallback =
- fun () -> msg_store_callback(
- PersistentMsgIds, Pubs, AckTags1, Fun,
- MsgPropsFun, AsyncCallback, SyncCallback)
- end,
- ok = msg_store_sync(MSCState, true, PersistentMsgIds,
- fun () -> spawn(MsgStoreCallback) end),
- State;
- false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
- Fun, MsgPropsFun, State)
- end)}.
-
requeue(AckTags, MsgPropsFun, State) ->
MsgPropsFun1 = fun (MsgProps) ->
(MsgPropsFun(MsgProps)) #message_properties {
@@ -748,23 +674,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { on_sync = OnSync }) ->
- case {OnSync, needs_index_sync(State)} of
- {?BLANK_SYNC, false} ->
- case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- _ ->
- timed
+needs_timeout(State) ->
+ case needs_index_sync(State) of
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end;
+ true -> timed
end.
timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
+ a(reduce_memory_use(confirm_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -774,7 +699,6 @@ status(#vqstate {
len = Len,
pending_ack = PA,
ram_ack_index = RAI,
- on_sync = #sync { funs = From },
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -791,7 +715,6 @@ status(#vqstate {
{q4 , queue:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -803,10 +726,9 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
-invoke(?MODULE, Fun, State) ->
- Fun(?MODULE, State).
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
-is_duplicate(_Txn, _Msg, State) -> {false, State}.
+is_duplicate(_Msg, State) -> {false, State}.
discard(_Msg, _ChPid, State) -> State.
@@ -902,11 +824,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end).
-
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
MSCState, IsPersistent,
@@ -923,20 +840,6 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-lookup_tx(Txn) -> case get({txn, Txn}) of
- undefined -> #tx { pending_messages = [],
- pending_acks = [] };
- V -> V
- end.
-
-store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
-
-erase_tx(Txn) -> erase({txn, Txn}).
-
-persistent_msg_ids(Pubs) ->
- [MsgId || {#basic_message { id = MsgId,
- is_persistent = true }, _MsgProps} <- Pubs].
-
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
@@ -1000,8 +903,8 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
- AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
+init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+ PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1023,12 +926,10 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
durable = IsDurable,
transient_threshold = NextSeqId,
async_callback = AsyncCallback,
- sync_callback = SyncCallback,
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1146,88 +1047,6 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
len = Len1,
persistent_count = PCount1 }}.
-msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
- AsyncCallback, SyncCallback) ->
- case SyncCallback(?MODULE,
- fun (?MODULE, StateN) ->
- tx_commit_post_msg_store(true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
- end) of
- ok -> ok;
- error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
- end.
-
-remove_persistent_messages(MsgIds, AsyncCallback) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
- undefined, AsyncCallback),
- ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
- rabbit_msg_store:client_delete_and_terminate(PersistentClient).
-
-tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
- State = #vqstate {
- on_sync = OnSync = #sync {
- acks_persistent = SPAcks,
- acks_all = SAcks,
- pubs = SPubs,
- funs = SFuns },
- pending_ack = PA,
- durable = IsDurable }) ->
- PersistentAcks =
- case IsDurable of
- true -> [AckTag || AckTag <- AckTags,
- case dict:fetch(AckTag, PA) of
- #msg_status {} ->
- false;
- {IsPersistent, _MsgId, _MsgProps} ->
- IsPersistent
- end];
- false -> []
- end,
- case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
- true -> State #vqstate {
- on_sync = #sync {
- acks_persistent = [PersistentAcks | SPAcks],
- acks_all = [AckTags | SAcks],
- pubs = [{MsgPropsFun, Pubs} | SPubs],
- funs = [Fun | SFuns] }};
- false -> State1 = tx_commit_index(
- State #vqstate {
- on_sync = #sync {
- acks_persistent = [],
- acks_all = [AckTags],
- pubs = [{MsgPropsFun, Pubs}],
- funs = [Fun] } }),
- State1 #vqstate { on_sync = OnSync }
- end.
-
-tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- State;
-tx_commit_index(State = #vqstate { on_sync = #sync {
- acks_persistent = SPAcks,
- acks_all = SAcks,
- pubs = SPubs,
- funs = SFuns },
- durable = IsDurable }) ->
- PAcks = lists:append(SPAcks),
- Acks = lists:append(SAcks),
- Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
- {Msg, MsgProps} <- lists:reverse(PubsN)],
- {_MsgIds, State1} = ack(Acks, State),
- {SeqIds, State2 = #vqstate { index_state = IndexState }} =
- lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps},
- {SeqIdsAcc, State3}) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State4} =
- publish(Msg, MsgProps, false, IsPersistent1, State3),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
- end, {PAcks, State1}, Pubs),
- IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
- [ Fun() || Fun <- lists:reverse(SFuns) ],
- reduce_memory_use(
- State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
-
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,