diff options
author | John DeTreville <jdetreville@vmware.com> | 2011-03-23 20:26:07 -0700 |
---|---|---|
committer | John DeTreville <jdetreville@vmware.com> | 2011-03-23 20:26:07 -0700 |
commit | 4a1d115af72ef59c6beee94565a3e0f75859c289 (patch) | |
tree | 634e79880fff84c0360cd22009874f945a6e2666 | |
parent | ca6a8a457402e1278811427f4af9a2fca75d180f (diff) | |
download | rabbitmq-server-4a1d115af72ef59c6beee94565a3e0f75859c289.tar.gz |
Most changes; most tests pass.
-rw-r--r-- | src/rabbit_disk_queue.erl | 39 | ||||
-rw-r--r-- | src/rabbit_mnesia_queue.erl | 68 | ||||
-rw-r--r-- | src/rabbit_ram_queue.erl | 4 |
3 files changed, 59 insertions, 52 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2593d9e9..8a7881e9 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -212,7 +212,9 @@ terminate(State) -> State #state { pending_acks = dict:new() }. %% -spec(delete_and_terminate/1 :: (state()) -> state()). delete_and_terminate(State = #state { q_file_names = QFileNames }) -> - lists:foreach(fun file:delete/1, queue:to_list(QFileNames)), + lists:foreach( + fun (filename) -> ok = file:delete(filename) end, + queue:to_list(QFileNames)), State #state { q0 = queue:new(), q0_len = 0, q_file_names = queue:new(), @@ -228,7 +230,9 @@ delete_and_terminate(State = #state { q_file_names = QFileNames }) -> %% -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). purge(State = #state { q_file_names = QFileNames }) -> - lists:foreach(fun file:delete/1, queue:to_list(QFileNames)), + lists:foreach( + fun (filename) -> ok = file:delete(filename) end, + queue:to_list(QFileNames)), {internal_len(State), State #state { q0 = queue:new(), q0_len = 0, @@ -551,7 +555,7 @@ internal_dropwhile(Pred, State) -> post_pop(true, MsgStatus = #msg_status { - seq_id = SeqId, msg = Msg, is_delivered = IsDelivered }, + seq_id = SeqId, msg = Msg, is_delivered = IsDelivered }, State = #state { pending_acks = PendingAcks }) -> MsgStatus1 = MsgStatus #msg_status { is_delivered = true }, {{Msg, IsDelivered, SeqId, internal_len(State)}, @@ -599,10 +603,11 @@ push_q0(State = #state { dir = Dir, if Q0Len < ?FILE_BATCH_SIZE -> State; true -> FileName = Dir ++ "/" ++ integer_to_list(FileId), - Worker ! {write_behind, FileName, term_to_binary(Q0)}, + _ = (Worker ! {write_behind, FileName, term_to_binary(Q0)}), case queue:is_empty(QFileNames) of true -> - Worker ! {read_ahead, FileName }; + _ = (Worker ! {read_ahead, FileName }), + ok; false -> ok end, State #state { next_file_id = FileId + 1, @@ -626,14 +631,12 @@ pull_q1(State = #state { q0 = Q0, if Q1Len > 0 -> State; QFileNamesLen > 0 -> {{value, FileName}, QFileNames1} = queue:out(QFileNames), - Worker ! {read, FileName}, - receive - {binary, Binary} -> - ok - end, + _ = (Worker ! {read, FileName}), + receive {binary, Binary} -> ok end, case queue:out(QFileNames1) of {{value, FileName1}, _} -> - Worker ! {read_ahead, FileName1}; + _ = (Worker ! {read_ahead, FileName1}), + ok; _ -> ok end, State #state { q_file_names = QFileNames1, @@ -707,8 +710,8 @@ confirm(Pubs, State = #state { confirmed = Confirmed }) -> end. %% ---------------------------------------------------------------------------- -%% Background worker process for speeding up demo, currently with no -%% mechanisms for shutdown +%% Background worker process (non-OTP) for speeding up demo by about +%% 10%, currently with no mechanism for shutdown %% ---------------------------------------------------------------------------- -spec spawn_worker() -> pid(). @@ -716,19 +719,19 @@ confirm(Pubs, State = #state { confirmed = Confirmed }) -> spawn_worker() -> Parent = self(), spawn(fun() -> worker(Parent, nothing) end). --spec worker(pid(), maybe({string(), binary()})) -> none(). +-spec worker(pid(), maybe({string(), binary()})) -> no_return(). -worker(Parent, State) -> +worker(Parent, Contents) -> receive {write_behind, FileName, Binary} -> ok = file:write_file(FileName, Binary), - worker(Parent, State); + worker(Parent, Contents); {read_ahead, FileName} -> {ok, Binary} = file:read_file(FileName), ok = file:delete(FileName), worker(Parent, {just, {FileName, Binary}}); {read, FileName} -> - {just, {FileName, Binary}} = State, - Parent ! {binary, Binary}, + {just, {FileName, Binary}} = Contents, + (Parent ! {binary, Binary}), worker(Parent, nothing) end. diff --git a/src/rabbit_mnesia_queue.erl b/src/rabbit_mnesia_queue.erl index 4a4040fa..9583f0e3 100644 --- a/src/rabbit_mnesia_queue.erl +++ b/src/rabbit_mnesia_queue.erl @@ -189,8 +189,8 @@ stop() -> ok. init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) -> {QTable, PTable} = tables(QueueName), case Recover of - false -> _ = mnesia:delete_table(QTable), - _ = mnesia:delete_table(PTable); + false -> {atomic, ok} = mnesia:delete_table(QTable), + {atomic, ok} = mnesia:delete_table(PTable); true -> ok end, create_table(QTable, 'q_record', 'ordered_set', record_info(fields, @@ -211,7 +211,7 @@ init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) -> #state { q_table = QTable, p_table = PTable, next_seq_id = NextSeqId, - confirmed = gb_sets:new(), + confirmed = gb_sets:new(), txn_dict = dict:new() } end), State. @@ -226,8 +226,8 @@ init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) -> %% -spec(terminate/1 :: (state()) -> state()). terminate(State = #state { q_table = QTable, p_table = PTable }) -> - {atomic, _} = mnesia:clear_table(PTable), - mnesia:dump_tables([QTable, PTable]), + {atomic, ok} = mnesia:clear_table(PTable), + {atomic, ok} = mnesia:dump_tables([QTable, PTable]), State. %% ---------------------------------------------------------------------------- @@ -245,7 +245,7 @@ delete_and_terminate(State = #state { q_table = QTable, p_table = PTable }) -> mnesia:transaction(fun () -> clear_table(QTable), clear_table(PTable) end), - mnesia:dump_tables([QTable, PTable]), + {atomic, ok} = mnesia:dump_tables([QTable, PTable]), State. %% ---------------------------------------------------------------------------- @@ -301,9 +301,9 @@ publish(Msg, Props, State) -> publish_delivered(false, Msg, Props, State) -> {undefined, confirm([{Msg, Props}], State)}; publish_delivered(true, - Msg, - Props, - State = #state { next_seq_id = SeqId }) -> + Msg, + Props, + State = #state { next_seq_id = SeqId }) -> MsgStatus = #msg_status { seq_id = SeqId, msg = Msg, props = Props, @@ -354,9 +354,9 @@ dropwhile(Pred, State) -> %% (false, state()) -> {fetch_result(undefined), state()}). fetch(AckRequired, State) -> - {atomic, Result} = + {atomic, FetchResult} = mnesia:transaction(fun () -> internal_fetch(AckRequired, State) end), - Result. + {FetchResult, State}. %% ---------------------------------------------------------------------------- %% ack/2 acknowledges msgs named by SeqIds. @@ -424,12 +424,15 @@ tx_rollback(Txn, State = #state { txn_dict = TxnDict }) -> tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) -> #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict), + {atomic, State1} = mnesia:transaction( + fun () -> + internal_tx_commit( + Pubs, + SeqIds, + PropsF, + State #state { txn_dict = erase_tx(Txn, TxnDict) }) + end), F(), - State1 = internal_tx_commit( - Pubs, - SeqIds, - PropsF, - State #state { txn_dict = erase_tx(Txn, TxnDict) }), {SeqIds, confirm(Pubs, State1)}. %% ---------------------------------------------------------------------------- @@ -578,7 +581,7 @@ create_table(Table, RecordName, Type, Attributes) -> clear_table(Table) -> case mnesia:first(Table) of '$end_of_table' -> ok; - Key -> mnesia:delete(Table, Key, 'write'), + Key -> ok = mnesia:delete(Table, Key, 'write'), clear_table(Table) end. @@ -594,7 +597,7 @@ delete_nonpersistent_msgs(QTable) -> case MsgStatus of #msg_status { msg = #basic_message { is_persistent = true }} -> ok; - _ -> mnesia:delete(QTable, Key, 'write') + _ -> ok = mnesia:delete(QTable, Key, 'write') end end, mnesia:all_keys(QTable)). @@ -641,8 +644,10 @@ internal_publish(Msg, msg = Msg, props = Props, is_delivered = IsDelivered }, - mnesia:write( - QTable, #q_record { seq_id = SeqId, msg_status = MsgStatus }, 'write'), + ok = mnesia:write( + QTable, + #q_record { seq_id = SeqId, msg_status = MsgStatus }, + 'write'), State #state { next_seq_id = SeqId + 1 }. -spec(internal_ack/2 :: ([seq_id()], state()) -> state()). @@ -675,7 +680,7 @@ q_pop(#state { q_table = QTable }) -> '$end_of_table' -> nothing; SeqId -> [#q_record { seq_id = SeqId, msg_status = MsgStatus }] = mnesia:read(QTable, SeqId, 'read'), - mnesia:delete(QTable, SeqId, 'write'), + ok = mnesia:delete(QTable, SeqId, 'write'), {just, MsgStatus} end. @@ -694,9 +699,8 @@ q_peek(#state { q_table = QTable }) -> %% post_pop operates after q_pop, calling add_pending_ack if necessary. --spec(post_pop(true, msg_status(), state()) -> {fetch_result(ack()), state()}; - (false, msg_status(), state()) -> - {fetch_result(undefined), state()}). +-spec(post_pop(true, msg_status(), state()) -> fetch_result(ack()); + (false, msg_status(), state()) -> fetch_result(undefined)). post_pop(true, MsgStatus = #msg_status { @@ -716,10 +720,10 @@ post_pop(false, -spec add_pending_ack(msg_status(), state()) -> ok. add_pending_ack(MsgStatus = #msg_status { seq_id = SeqId }, - #state { p_table = PTable }) -> - mnesia:write(PTable, - #p_record { seq_id = SeqId, msg_status = MsgStatus }, - 'write'), + #state { p_table = PTable }) -> + ok = mnesia:write(PTable, + #p_record { seq_id = SeqId, msg_status = MsgStatus }, + 'write'), ok. %% del_pending_acks deletes some set of pending acks from the P table @@ -727,16 +731,16 @@ add_pending_ack(MsgStatus = #msg_status { seq_id = SeqId }, %% msg is deleted. -spec del_pending_acks(fun ((msg_status(), state()) -> state()), - [seq_id()], - state()) -> - state(). + [seq_id()], + state()) -> + state(). del_pending_acks(F, SeqIds, State = #state { p_table = PTable }) -> lists:foldl( fun (SeqId, S) -> [#p_record { msg_status = MsgStatus }] = mnesia:read(PTable, SeqId, 'read'), - mnesia:delete(PTable, SeqId, 'write'), + ok = mnesia:delete(PTable, SeqId, 'write'), F(MsgStatus, S) end, State, diff --git a/src/rabbit_ram_queue.erl b/src/rabbit_ram_queue.erl index 6f8cc9c2..4483318b 100644 --- a/src/rabbit_ram_queue.erl +++ b/src/rabbit_ram_queue.erl @@ -155,7 +155,7 @@ init(_QueueName, _IsDurable, _Recover, _asyncCallback, _SyncCallback) -> q_len = 0, pending_acks = dict:new(), next_seq_id = 0, - confirmed = gb_sets:new(), + confirmed = gb_sets:new(), txn_dict = dict:new() }. %% ---------------------------------------------------------------------------- @@ -495,7 +495,7 @@ internal_dropwhile(Pred, State = #state { q = Q, q_len = QLen }) -> post_pop(true, MsgStatus = #msg_status { - seq_id = SeqId, msg = Msg, is_delivered = IsDelivered }, + seq_id = SeqId, msg = Msg, is_delivered = IsDelivered }, State = #state { q_len = QLen, pending_acks = PendingAcks }) -> MsgStatus1 = MsgStatus #msg_status { is_delivered = true }, {{Msg, IsDelivered, SeqId, QLen}, |