diff options
author | John DeTreville <jdetreville@vmware.com> | 2011-03-28 19:09:21 -0700 |
---|---|---|
committer | John DeTreville <jdetreville@vmware.com> | 2011-03-28 19:09:21 -0700 |
commit | b525f3a469bb9c2345bf483b471da976178ee0c9 (patch) | |
tree | 397d25a022be349d277c4c8b384ececf07081211 | |
parent | bdf7a46bc9e7fa388e87878834971472e4616092 (diff) | |
download | rabbitmq-server-bug23576.tar.gz |
Stylistic changes to rabbit_mnesia_queue.bug23576
-rw-r--r-- | src/rabbit_mnesia_queue.erl | 210 |
1 files changed, 91 insertions, 119 deletions
diff --git a/src/rabbit_mnesia_queue.erl b/src/rabbit_mnesia_queue.erl index 72a67946..baaa74f2 100644 --- a/src/rabbit_mnesia_queue.erl +++ b/src/rabbit_mnesia_queue.erl @@ -30,7 +30,8 @@ -behaviour(rabbit_backing_queue). --record(state, { q_table, p_table, next_seq_id, confirmed, txn_dict }). +-record(state, + { q_table, pending_ack_table, next_seq_id, confirmed, txn_dict }). -record(msg_status, { seq_id, msg, props, is_delivered }). @@ -43,10 +44,8 @@ -type(seq_id() :: non_neg_integer()). -type(ack() :: seq_id()). --type(state() :: #state { q_table :: atom(), - p_table :: atom(), - next_seq_id :: seq_id(), - confirmed :: gb_set(), +-type(state() :: #state { q_table :: atom(), pending_ack_table :: atom(), + next_seq_id :: seq_id(), confirmed :: gb_set(), txn_dict :: dict() }). -type(msg_status() :: #msg_status { msg :: rabbit_types:basic_message(), @@ -54,8 +53,7 @@ props :: rabbit_types:message_properties(), is_delivered :: boolean() }). --type(tx() :: #tx { to_pub :: [pub()], - to_ack :: [seq_id()] }). +-type(tx() :: #tx { to_pub :: [pub()], to_ack :: [seq_id()] }). -type(pub() :: { rabbit_types:basic_message(), rabbit_types:message_properties() }). @@ -71,16 +69,12 @@ -spec(internal_fetch(true, state()) -> fetch_result(ack()); (false, state()) -> fetch_result(undefined)). --spec internal_tx_commit([pub()], - [seq_id()], - message_properties_transformer(), +-spec internal_tx_commit([pub()], [seq_id()], message_properties_transformer(), state()) -> state(). -spec internal_publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), - boolean(), - state()) -> + rabbit_types:message_properties(), boolean(), state()) -> state(). -spec(internal_ack/2 :: ([seq_id()], state()) -> state()). @@ -98,8 +92,7 @@ -spec add_pending_ack(msg_status(), state()) -> ok. --spec del_pending_acks(fun ((msg_status(), state()) -> state()), - [seq_id()], +-spec del_pending_acks(fun ((msg_status(), state()) -> state()), [seq_id()], state()) -> state(). @@ -120,85 +113,77 @@ start(_DurableQueues) -> ok. stop() -> ok. init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) -> - {QTable, PTable} = tables(QueueName), + {QTable, PendingAckTable} = tables(QueueName), case Recover of false -> case mnesia:delete_table(QTable) of {atomic, ok} -> ok; {aborted, {no_exists, QTable}} -> ok end, - case mnesia:delete_table(PTable) of + case mnesia:delete_table(PendingAckTable) of {atomic, ok} -> ok; - {aborted, {no_exists, PTable}} -> ok + {aborted, {no_exists, PendingAckTable}} -> ok end; true -> ok end, - ok = create_table( - QTable, - 'msg_status', - 'ordered_set', - record_info(fields, msg_status)), - ok = create_table( - PTable, 'msg_status', 'set', record_info(fields, msg_status)), + ok = create_table(QTable, msg_status, ordered_set, + record_info(fields, msg_status)), + ok = create_table(PendingAckTable, msg_status, set, + record_info(fields, msg_status)), {atomic, State} = - mnesia:transaction( + mnesia:sync_transaction( fun () -> case IsDurable of false -> ok = clear_table(QTable), - ok = clear_table(PTable); + ok = clear_table(PendingAckTable); true -> ok = delete_nonpersistent_msgs(QTable) end, - NextSeqId = case mnesia:first(QTable) of + NextSeqId = case mnesia:last(QTable) of '$end_of_table' -> 0; - SeqId -> SeqId + SeqId -> SeqId + 1 end, #state { q_table = QTable, - p_table = PTable, - next_seq_id = NextSeqId, - confirmed = gb_sets:new(), + pending_ack_table = PendingAckTable, + next_seq_id = NextSeqId, confirmed = gb_sets:new(), txn_dict = dict:new() } end), State. -terminate(State = #state { p_table = PTable }) -> - {atomic, ok} = mnesia:clear_table(PTable), +terminate(State = #state { pending_ack_table = PendingAckTable }) -> + {atomic, ok} = mnesia:clear_table(PendingAckTable), State. -delete_and_terminate(State = #state { q_table = QTable, p_table = PTable }) -> - {atomic, _} = mnesia:transaction(fun () -> ok = clear_table(QTable), - ok = clear_table(PTable) - end), +delete_and_terminate(State = #state { q_table = QTable, + pending_ack_table = PendingAckTable }) -> + {atomic, _} = mnesia:sync_transaction( + fun () -> ok = clear_table(QTable), + ok = clear_table(PendingAckTable) + end), State. purge(State = #state { q_table = QTable }) -> - {atomic, Result} = - mnesia:transaction(fun () -> LQ = table_length(QTable), - ok = clear_table(QTable), - {LQ, State} - end), + {atomic, Result} = mnesia:sync_transaction( + fun () -> LQ = table_length(QTable), + ok = clear_table(QTable), + {LQ, State} + end), Result. publish(Msg, Props, State) -> {atomic, State1} = - mnesia:transaction( + mnesia:sync_transaction( fun () -> internal_publish(Msg, Props, false, State) end), confirm([{Msg, Props}], State1). publish_delivered(false, Msg, Props, State) -> {undefined, confirm([{Msg, Props}], State)}; -publish_delivered(true, - Msg, - Props, - State = #state { next_seq_id = SeqId }) -> - MsgStatus = #msg_status { seq_id = SeqId, - msg = Msg, - props = Props, - is_delivered = true }, - {atomic, State1} = - mnesia:transaction( - fun () -> - ok = add_pending_ack(MsgStatus, State), - State #state { next_seq_id = SeqId + 1 } - end), +publish_delivered(true, Msg, Props, State = #state { next_seq_id = SeqId }) -> + MsgStatus = #msg_status { + seq_id = SeqId, msg = Msg, props = Props, is_delivered = true }, + {atomic, State1} = mnesia:sync_transaction( + fun () -> + ok = add_pending_ack(MsgStatus, State), + State #state { next_seq_id = SeqId + 1 } + end), {SeqId, confirm([{Msg, Props}], State1)}. drain_confirmed(State = #state { confirmed = Confirmed }) -> @@ -206,17 +191,17 @@ drain_confirmed(State = #state { confirmed = Confirmed }) -> dropwhile(Pred, State) -> {atomic, Result} = - mnesia:transaction(fun () -> internal_dropwhile(Pred, State) end), + mnesia:sync_transaction(fun () -> internal_dropwhile(Pred, State) end), Result. fetch(AckRequired, State) -> - {atomic, FetchResult} = - mnesia:transaction(fun () -> internal_fetch(AckRequired, State) end), + {atomic, FetchResult} = mnesia:sync_transaction( + fun () -> internal_fetch(AckRequired, State) end), {FetchResult, State}. ack(SeqIds, State) -> - {atomic, Result} = - mnesia:transaction(fun () -> internal_ack(SeqIds, State) end), + {atomic, Result} = mnesia:sync_transaction( + fun () -> internal_ack(SeqIds, State) end), Result. tx_publish(Txn, Msg, Props, State = #state { txn_dict = TxnDict}) -> @@ -237,37 +222,35 @@ tx_rollback(Txn, State = #state { txn_dict = TxnDict }) -> tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) -> #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict), - {atomic, State1} = mnesia:transaction( - fun () -> - internal_tx_commit( - Pubs, - SeqIds, - PropsF, - State #state { txn_dict = erase_tx(Txn, TxnDict) }) - end), + {atomic, State1} = + mnesia:sync_transaction( + fun () -> + internal_tx_commit( + Pubs, SeqIds, PropsF, + State #state { txn_dict = erase_tx(Txn, TxnDict) }) + end), F(), {SeqIds, confirm(Pubs, State1)}. requeue(SeqIds, PropsF, State) -> {atomic, Result} = - mnesia:transaction( + mnesia:sync_transaction( fun () -> del_pending_acks( fun (#msg_status { msg = Msg, props = Props }, S) -> - internal_publish( - Msg, PropsF(Props), true, S) + internal_publish(Msg, PropsF(Props), true, S) end, - SeqIds, - State) + SeqIds, State) end), Result. len(#state { q_table = QTable }) -> - {atomic, Result} = mnesia:transaction(fun () -> table_length(QTable) end), + {atomic, Result} = mnesia:sync_transaction( + fun () -> table_length(QTable) end), Result. is_empty(#state { q_table = QTable }) -> {atomic, Result} = - mnesia:transaction(fun () -> 0 == table_length(QTable) end), + mnesia:sync_transaction(fun () -> 0 == table_length(QTable) end), Result. set_ram_duration_target(_, State) -> State. @@ -280,22 +263,20 @@ idle_timeout(State) -> State. handle_pre_hibernate(State) -> State. -status(#state { q_table = QTable, - p_table = PTable, +status(#state { q_table = QTable, pending_ack_table = PendingAckTable, next_seq_id = NextSeqId }) -> {atomic, Result} = - mnesia:transaction( - fun () -> [{len, table_length(QTable)}, - {next_seq_id, NextSeqId}, - {acks, table_length(PTable)}] + mnesia:sync_transaction( + fun () -> [{len, table_length(QTable)}, {next_seq_id, NextSeqId}, + {acks, table_length(PendingAckTable)}] end), Result. create_table(Table, RecordName, Type, Attributes) -> - case mnesia:create_table(Table, [{record_name, RecordName}, - {type, Type}, - {attributes, Attributes}, - {disc_copies, [node()]}]) of + case mnesia:create_table( + Table, + [{record_name, RecordName}, {type, Type}, {attributes, Attributes}, + {disc_copies, rabbit_mnesia:running_clustered_nodes()}]) of {atomic, ok} -> ok; {aborted, {already_exists, Table}} -> RecordName = mnesia:table_info(Table, record_name), @@ -306,20 +287,19 @@ create_table(Table, RecordName, Type, Attributes) -> clear_table(Table) -> mnesia:foldl(fun (#msg_status { seq_id = SeqId }, ok) -> - ok = mnesia:delete(Table, SeqId, 'write') + ok = mnesia:delete(Table, SeqId, write) end, - ok, - Table). + ok, Table). delete_nonpersistent_msgs(QTable) -> mnesia:foldl(fun (MsgStatus = #msg_status { seq_id = SeqId }, ok) -> case MsgStatus of #msg_status { msg = #basic_message { is_persistent = true }} -> ok; - _ -> ok = mnesia:delete(QTable, SeqId, 'write') + _ -> ok = mnesia:delete(QTable, SeqId, write) end end, - ok, + ok, QTable). internal_fetch(AckRequired, State) -> @@ -330,23 +310,16 @@ internal_fetch(AckRequired, State) -> internal_tx_commit(Pubs, SeqIds, PropsF, State) -> State1 = internal_ack(SeqIds, State), - lists:foldl( - fun ({Msg, Props}, S) -> - internal_publish(Msg, PropsF(Props), false, S) - end, - State1, - lists:reverse(Pubs)). + lists:foldl(fun ({Msg, Props}, S) -> + internal_publish(Msg, PropsF(Props), false, S) + end, + State1, lists:reverse(Pubs)). -internal_publish(Msg, - Props, - IsDelivered, +internal_publish(Msg, Props, IsDelivered, State = #state { q_table = QTable, next_seq_id = SeqId }) -> MsgStatus = #msg_status { - seq_id = SeqId, - msg = Msg, - props = Props, - is_delivered = IsDelivered }, - ok = mnesia:write(QTable, MsgStatus, 'write'), + seq_id = SeqId, msg = Msg, props = Props, is_delivered = IsDelivered }, + ok = mnesia:write(QTable, MsgStatus, write), State #state { next_seq_id = SeqId + 1 }. internal_ack(SeqIds, State) -> @@ -367,15 +340,15 @@ internal_dropwhile(Pred, State) -> q_pop(#state { q_table = QTable }) -> case mnesia:first(QTable) of '$end_of_table' -> nothing; - SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, 'read'), - ok = mnesia:delete(QTable, SeqId, 'write'), + SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, read), + ok = mnesia:delete(QTable, SeqId, write), {just, MsgStatus} end. q_peek(#state { q_table = QTable }) -> case mnesia:first(QTable) of '$end_of_table' -> nothing; - SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, 'read'), + SeqId -> [MsgStatus] = mnesia:read(QTable, SeqId, read), {just, MsgStatus} end. @@ -386,23 +359,22 @@ post_pop(true, LQ = table_length(QTable), ok = add_pending_ack(MsgStatus #msg_status { is_delivered = true }, State), {Msg, IsDelivered, SeqId, LQ}; -post_pop(false, - #msg_status { msg = Msg, is_delivered = IsDelivered }, +post_pop(false, #msg_status { msg = Msg, is_delivered = IsDelivered }, #state { q_table = QTable }) -> {Msg, IsDelivered, undefined, table_length(QTable)}. -add_pending_ack(MsgStatus, #state { p_table = PTable }) -> - ok = mnesia:write(PTable, MsgStatus, 'write'). +add_pending_ack(MsgStatus, #state { pending_ack_table = PendingAckTable }) -> + ok = mnesia:write(PendingAckTable, MsgStatus, write). -del_pending_acks(F, SeqIds, State = #state { p_table = PTable }) -> +del_pending_acks( + F, SeqIds, State = #state { pending_ack_table = PendingAckTable }) -> lists:foldl( fun (SeqId, S) -> - [MsgStatus] = mnesia:read(PTable, SeqId, 'read'), - ok = mnesia:delete(PTable, SeqId, 'write'), + [MsgStatus] = mnesia:read(PendingAckTable, SeqId, read), + ok = mnesia:delete(PendingAckTable, SeqId, write), F(MsgStatus, S) end, - State, - SeqIds). + State, SeqIds). tables({resource, VHost, queue, Name}) -> VHost2 = re:split(binary_to_list(VHost), "[/]", [{return, list}]), |