summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mnesia_queue.erl210
1 files changed, 91 insertions, 119 deletions
diff --git a/src/rabbit_mnesia_queue.erl b/src/rabbit_mnesia_queue.erl
index 72a67946..baaa74f2 100644
--- a/src/rabbit_mnesia_queue.erl
+++ b/src/rabbit_mnesia_queue.erl
@@ -30,7 +30,8 @@
-behaviour(rabbit_backing_queue).
--record(state, { q_table, p_table, next_seq_id, confirmed, txn_dict }).
+-record(state,
+ { q_table, pending_ack_table, next_seq_id, confirmed, txn_dict }).
-record(msg_status, { seq_id, msg, props, is_delivered }).
@@ -43,10 +44,8 @@
-type(seq_id() :: non_neg_integer()).
-type(ack() :: seq_id()).
--type(state() :: #state { q_table :: atom(),
- p_table :: atom(),
- next_seq_id :: seq_id(),
- confirmed :: gb_set(),
+-type(state() :: #state { q_table :: atom(), pending_ack_table :: atom(),
+ next_seq_id :: seq_id(), confirmed :: gb_set(),
txn_dict :: dict() }).
-type(msg_status() :: #msg_status { msg :: rabbit_types:basic_message(),
@@ -54,8 +53,7 @@
props :: rabbit_types:message_properties(),
is_delivered :: boolean() }).
--type(tx() :: #tx { to_pub :: [pub()],
- to_ack :: [seq_id()] }).
+-type(tx() :: #tx { to_pub :: [pub()], to_ack :: [seq_id()] }).
-type(pub() :: { rabbit_types:basic_message(),
rabbit_types:message_properties() }).
@@ -71,16 +69,12 @@
-spec(internal_fetch(true, state()) -> fetch_result(ack());
(false, state()) -> fetch_result(undefined)).
--spec internal_tx_commit([pub()],
- [seq_id()],
- message_properties_transformer(),
+-spec internal_tx_commit([pub()], [seq_id()], message_properties_transformer(),
state()) ->
state().
-spec internal_publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(),
- boolean(),
- state()) ->
+ rabbit_types:message_properties(), boolean(), state()) ->
state().
-spec(internal_ack/2 :: ([seq_id()], state()) -> state()).
@@ -98,8 +92,7 @@
-spec add_pending_ack(msg_status(), state()) -> ok.
--spec del_pending_acks(fun ((msg_status(), state()) -> state()),
- [seq_id()],
+-spec del_pending_acks(fun ((msg_status(), state()) -> state()), [seq_id()],
state()) ->
state().
@@ -120,85 +113,77 @@ start(_DurableQueues) -> ok.
stop() -> ok.
init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) ->
- {QTable, PTable} = tables(QueueName),
+ {QTable, PendingAckTable} = tables(QueueName),
case Recover of
false -> case mnesia:delete_table(QTable) of
{atomic, ok} -> ok;
{aborted, {no_exists, QTable}} -> ok
end,
- case mnesia:delete_table(PTable) of
+ case mnesia:delete_table(PendingAckTable) of
{atomic, ok} -> ok;
- {aborted, {no_exists, PTable}} -> ok
+ {aborted, {no_exists, PendingAckTable}} -> ok
end;
true -> ok
end,
- ok = create_table(
- QTable,
- 'msg_status',
- 'ordered_set',
- record_info(fields, msg_status)),
- ok = create_table(
- PTable, 'msg_status', 'set', record_info(fields, msg_status)),
+ ok = create_table(QTable, msg_status, ordered_set,
+ record_info(fields, msg_status)),
+ ok = create_table(PendingAckTable, msg_status, set,
+ record_info(fields, msg_status)),
{atomic, State} =
- mnesia:transaction(
+ mnesia:sync_transaction(
fun () ->
case IsDurable of
false -> ok = clear_table(QTable),
- ok = clear_table(PTable);
+ ok = clear_table(PendingAckTable);
true -> ok = delete_nonpersistent_msgs(QTable)
end,
- NextSeqId = case mnesia:first(QTable) of
+ NextSeqId = case mnesia:last(QTable) of
'$end_of_table' -> 0;
- SeqId -> SeqId
+ SeqId -> SeqId + 1
end,
#state { q_table = QTable,
- p_table = PTable,
- next_seq_id = NextSeqId,
- confirmed = gb_sets:new(),
+ pending_ack_table = PendingAckTable,
+ next_seq_id = NextSeqId, confirmed = gb_sets:new(),
txn_dict = dict:new() }
end),
State.
-terminate(State = #state { p_table = PTable }) ->
- {atomic, ok} = mnesia:clear_table(PTable),
+terminate(State = #state { pending_ack_table = PendingAckTable }) ->
+ {atomic, ok} = mnesia:clear_table(PendingAckTable),
State.
-delete_and_terminate(State = #state { q_table = QTable, p_table = PTable }) ->
- {atomic, _} = mnesia:transaction(fun () -> ok = clear_table(QTable),
- ok = clear_table(PTable)
- end),
+delete_and_terminate(State = #state { q_table = QTable,
+ pending_ack_table = PendingAckTable }) ->
+ {atomic, _} = mnesia:sync_transaction(
+ fun () -> ok = clear_table(QTable),
+ ok = clear_table(PendingAckTable)
+ end),
State.
purge(State = #state { q_table = QTable }) ->
- {atomic, Result} =
- mnesia:transaction(fun () -> LQ = table_length(QTable),
- ok = clear_table(QTable),
- {LQ, State}
- end),
+ {atomic, Result} = mnesia:sync_transaction(
+ fun () -> LQ = table_length(QTable),
+ ok = clear_table(QTable),
+ {LQ, State}
+ end),
Result.
publish(Msg, Props, State) ->
{atomic, State1} =
- mnesia:transaction(
+ mnesia:sync_transaction(
fun () -> internal_publish(Msg, Props, false, State) end),
confirm([{Msg, Props}], State1).
publish_delivered(false, Msg, Props, State) ->
{undefined, confirm([{Msg, Props}], State)};
-publish_delivered(true,
- Msg,
- Props,
- State = #state { next_seq_id = SeqId }) ->
- MsgStatus = #msg_status { seq_id = SeqId,
- msg = Msg,
- props = Props,
- is_delivered = true },
- {atomic, State1} =
- mnesia:transaction(
- fun () ->
- ok = add_pending_ack(MsgStatus, State),
- State #state { next_seq_id = SeqId + 1 }
- end),
+publish_delivered(true, Msg, Props, State = #state { next_seq_id = SeqId }) ->
+ MsgStatus = #msg_status {
+ seq_id = SeqId, msg = Msg, props = Props, is_delivered = true },
+ {atomic, State1} = mnesia:sync_transaction(
+ fun () ->
+ ok = add_pending_ack(MsgStatus, State),
+ State #state { next_seq_id = SeqId + 1 }
+ end),
{SeqId, confirm([{Msg, Props}], State1)}.
drain_confirmed(State = #state { confirmed = Confirmed }) ->
@@ -206,17 +191,17 @@ drain_confirmed(State = #state { confirmed = Confirmed }) ->
dropwhile(Pred, State) ->
{atomic, Result} =
- mnesia:transaction(fun () -> internal_dropwhile(Pred, State) end),
+ mnesia:sync_transaction(fun () -> internal_dropwhile(Pred, State) end),
Result.
fetch(AckRequired, State) ->
- {atomic, FetchResult} =
- mnesia:transaction(fun () -> internal_fetch(AckRequired, State) end),
+ {atomic, FetchResult} = mnesia:sync_transaction(
+ fun () -> internal_fetch(AckRequired, State) end),
{FetchResult, State}.
ack(SeqIds, State) ->
- {atomic, Result} =
- mnesia:transaction(fun () -> internal_ack(SeqIds, State) end),
+ {atomic, Result} = mnesia:sync_transaction(
+ fun () -> internal_ack(SeqIds, State) end),
Result.
tx_publish(Txn, Msg, Props, State = #state { txn_dict = TxnDict}) ->
@@ -237,37 +222,35 @@ tx_rollback(Txn, State = #state { txn_dict = TxnDict }) ->
tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) ->
#tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict),
- {atomic, State1} = mnesia:transaction(
- fun () ->
- internal_tx_commit(
- Pubs,
- SeqIds,
- PropsF,
- State #state { txn_dict = erase_tx(Txn, TxnDict) })
- end),
+ {atomic, State1} =
+ mnesia:sync_transaction(
+ fun () ->
+ internal_tx_commit(
+ Pubs, SeqIds, PropsF,
+ State #state { txn_dict = erase_tx(Txn, TxnDict) })
+ end),
F(),
{SeqIds, confirm(Pubs, State1)}.
requeue(SeqIds, PropsF, State) ->
{atomic, Result} =
- mnesia:transaction(
+ mnesia:sync_transaction(
fun () -> del_pending_acks(
fun (#msg_status { msg = Msg, props = Props }, S) ->
- internal_publish(
- Msg, PropsF(Props), true, S)
+ internal_publish(Msg, PropsF(Props), true, S)
end,
- SeqIds,
- State)
+ SeqIds, State)
end),
Result.
len(#state { q_table = QTable }) ->
- {atomic, Result} = mnesia:transaction(fun () -> table_length(QTable) end),
+ {atomic, Result} = mnesia:sync_transaction(
+ fun () -> table_length(QTable) end),
Result.
is_empty(#state { q_table = QTable }) ->
{atomic, Result} =
- mnesia:transaction(fun () -> 0 == table_length(QTable) end),
+ mnesia:sync_transaction(fun () -> 0 == table_length(QTable) end),
Result.
set_ram_duration_target(_, State) -> State.
@@ -280,22 +263,20 @@ idle_timeout(State) -> State.
handle_pre_hibernate(State) -> State.
-status(#state { q_table = QTable,
- p_table = PTable,
+status(#state { q_table = QTable, pending_ack_table = PendingAckTable,
next_seq_id = NextSeqId }) ->
{atomic, Result} =
- mnesia:transaction(
- fun () -> [{len, table_length(QTable)},
- {next_seq_id, NextSeqId},
- {acks, table_length(PTable)}]
+ mnesia:sync_transaction(
+ fun () -> [{len, table_length(QTable)}, {next_seq_id, NextSeqId},
+ {acks, table_length(PendingAckTable)}]
end),
Result.
create_table(Table, RecordName, Type, Attributes) ->
- case mnesia:create_table(Table, [{record_name, RecordName},
- {type, Type},
- {attributes, Attributes},
- {disc_copies, [node()]}]) of
+ case mnesia:create_table(
+ Table,
+ [{record_name, RecordName}, {type, Type}, {attributes, Attributes},
+ {disc_copies, rabbit_mnesia:running_clustered_nodes()}]) of
{atomic, ok} -> ok;
{aborted, {already_exists, Table}} ->
RecordName = mnesia:table_info(Table, record_name),
@@ -306,20 +287,19 @@ create_table(Table, RecordName, Type, Attributes) ->
clear_table(Table) ->
mnesia:foldl(fun (#msg_status { seq_id = SeqId }, ok) ->
- ok = mnesia:delete(Table, SeqId, 'write')
+ ok = mnesia:delete(Table, SeqId, write)
end,
- ok,
- Table).
+ ok, Table).
delete_nonpersistent_msgs(QTable) ->
mnesia:foldl(fun (MsgStatus = #msg_status { seq_id = SeqId }, ok) ->
case MsgStatus of
#msg_status { msg = #basic_message {
is_persistent = true }} -> ok;
- _ -> ok = mnesia:delete(QTable, SeqId, 'write')
+ _ -> ok = mnesia:delete(QTable, SeqId, write)
end
end,
- ok,
+ ok,
QTable).
internal_fetch(AckRequired, State) ->
@@ -330,23 +310,16 @@ internal_fetch(AckRequired, State) ->
internal_tx_commit(Pubs, SeqIds, PropsF, State) ->
State1 = internal_ack(SeqIds, State),
- lists:foldl(
- fun ({Msg, Props}, S) ->
- internal_publish(Msg, PropsF(Props), false, S)
- end,
- State1,
- lists:reverse(Pubs)).
+ lists:foldl(fun ({Msg, Props}, S) ->
+ internal_publish(Msg, PropsF(Props), false, S)
+ end,
+ State1, lists:reverse(Pubs)).
-internal_publish(Msg,
- Props,
- IsDelivered,
+internal_publish(Msg, Props, IsDelivered,
State = #state { q_table = QTable, next_seq_id = SeqId }) ->
MsgStatus = #msg_status {
- seq_id = SeqId,
- msg = Msg,
- props = Props,
- is_delivered = IsDelivered },
- ok = mnesia:write(QTable, MsgStatus, 'write'),
+ seq_id = SeqId, msg = Msg, props = Props, is_delivered = IsDelivered },
+ ok = mnesia:write(QTable, MsgStatus, write),
State #state { next_seq_id = SeqId + 1 }.
internal_ack(SeqIds, State) ->
@@ -367,15 +340,15 @@ internal_dropwhile(Pred, State) ->
q_pop(#state { q_table = QTable }) ->
case mnesia:first(QTable) of
'$end_of_table' -> nothing;
- SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, 'read'),
- ok = mnesia:delete(QTable, SeqId, 'write'),
+ SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, read),
+ ok = mnesia:delete(QTable, SeqId, write),
{just, MsgStatus}
end.
q_peek(#state { q_table = QTable }) ->
case mnesia:first(QTable) of
'$end_of_table' -> nothing;
- SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, 'read'),
+ SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, read),
{just, MsgStatus}
end.
@@ -386,23 +359,22 @@ post_pop(true,
LQ = table_length(QTable),
ok = add_pending_ack(MsgStatus #msg_status { is_delivered = true }, State),
{Msg, IsDelivered, SeqId, LQ};
-post_pop(false,
- #msg_status { msg = Msg, is_delivered = IsDelivered },
+post_pop(false, #msg_status { msg = Msg, is_delivered = IsDelivered },
#state { q_table = QTable }) ->
{Msg, IsDelivered, undefined, table_length(QTable)}.
-add_pending_ack(MsgStatus, #state { p_table = PTable }) ->
- ok = mnesia:write(PTable, MsgStatus, 'write').
+add_pending_ack(MsgStatus, #state { pending_ack_table = PendingAckTable }) ->
+ ok = mnesia:write(PendingAckTable, MsgStatus, write).
-del_pending_acks(F, SeqIds, State = #state { p_table = PTable }) ->
+del_pending_acks(
+ F, SeqIds, State = #state { pending_ack_table = PendingAckTable }) ->
lists:foldl(
fun (SeqId, S) ->
- [MsgStatus] = mnesia:read(PTable, SeqId, 'read'),
- ok = mnesia:delete(PTable, SeqId, 'write'),
+ [MsgStatus] = mnesia:read(PendingAckTable, SeqId, read),
+ ok = mnesia:delete(PendingAckTable, SeqId, write),
F(MsgStatus, S)
end,
- State,
- SeqIds).
+ State, SeqIds).
tables({resource, VHost, queue, Name}) ->
VHost2 = re:split(binary_to_list(VHost), "[/]", [{return, list}]),