summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn DeTreville <jdetreville@vmware.com>2011-03-23 20:26:07 -0700
committerJohn DeTreville <jdetreville@vmware.com>2011-03-23 20:26:07 -0700
commit4a1d115af72ef59c6beee94565a3e0f75859c289 (patch)
tree634e79880fff84c0360cd22009874f945a6e2666
parentca6a8a457402e1278811427f4af9a2fca75d180f (diff)
downloadrabbitmq-server-4a1d115af72ef59c6beee94565a3e0f75859c289.tar.gz
Most changes; most tests pass.
-rw-r--r--src/rabbit_disk_queue.erl39
-rw-r--r--src/rabbit_mnesia_queue.erl68
-rw-r--r--src/rabbit_ram_queue.erl4
3 files changed, 59 insertions, 52 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2593d9e9..8a7881e9 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -212,7 +212,9 @@ terminate(State) -> State #state { pending_acks = dict:new() }.
%% -spec(delete_and_terminate/1 :: (state()) -> state()).
delete_and_terminate(State = #state { q_file_names = QFileNames }) ->
- lists:foreach(fun file:delete/1, queue:to_list(QFileNames)),
+ lists:foreach(
+ fun (filename) -> ok = file:delete(filename) end,
+ queue:to_list(QFileNames)),
State #state { q0 = queue:new(),
q0_len = 0,
q_file_names = queue:new(),
@@ -228,7 +230,9 @@ delete_and_terminate(State = #state { q_file_names = QFileNames }) ->
%% -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
purge(State = #state { q_file_names = QFileNames }) ->
- lists:foreach(fun file:delete/1, queue:to_list(QFileNames)),
+ lists:foreach(
+ fun (filename) -> ok = file:delete(filename) end,
+ queue:to_list(QFileNames)),
{internal_len(State),
State #state { q0 = queue:new(),
q0_len = 0,
@@ -551,7 +555,7 @@ internal_dropwhile(Pred, State) ->
post_pop(true,
MsgStatus = #msg_status {
- seq_id = SeqId, msg = Msg, is_delivered = IsDelivered },
+ seq_id = SeqId, msg = Msg, is_delivered = IsDelivered },
State = #state { pending_acks = PendingAcks }) ->
MsgStatus1 = MsgStatus #msg_status { is_delivered = true },
{{Msg, IsDelivered, SeqId, internal_len(State)},
@@ -599,10 +603,11 @@ push_q0(State = #state { dir = Dir,
if Q0Len < ?FILE_BATCH_SIZE -> State;
true ->
FileName = Dir ++ "/" ++ integer_to_list(FileId),
- Worker ! {write_behind, FileName, term_to_binary(Q0)},
+ _ = (Worker ! {write_behind, FileName, term_to_binary(Q0)}),
case queue:is_empty(QFileNames) of
true ->
- Worker ! {read_ahead, FileName };
+ _ = (Worker ! {read_ahead, FileName }),
+ ok;
false -> ok
end,
State #state { next_file_id = FileId + 1,
@@ -626,14 +631,12 @@ pull_q1(State = #state { q0 = Q0,
if Q1Len > 0 -> State;
QFileNamesLen > 0 ->
{{value, FileName}, QFileNames1} = queue:out(QFileNames),
- Worker ! {read, FileName},
- receive
- {binary, Binary} ->
- ok
- end,
+ _ = (Worker ! {read, FileName}),
+ receive {binary, Binary} -> ok end,
case queue:out(QFileNames1) of
{{value, FileName1}, _} ->
- Worker ! {read_ahead, FileName1};
+ _ = (Worker ! {read_ahead, FileName1}),
+ ok;
_ -> ok
end,
State #state { q_file_names = QFileNames1,
@@ -707,8 +710,8 @@ confirm(Pubs, State = #state { confirmed = Confirmed }) ->
end.
%% ----------------------------------------------------------------------------
-%% Background worker process for speeding up demo, currently with no
-%% mechanisms for shutdown
+%% Background worker process (non-OTP) for speeding up demo by about
+%% 10%, currently with no mechanism for shutdown
%% ----------------------------------------------------------------------------
-spec spawn_worker() -> pid().
@@ -716,19 +719,19 @@ confirm(Pubs, State = #state { confirmed = Confirmed }) ->
spawn_worker() -> Parent = self(),
spawn(fun() -> worker(Parent, nothing) end).
--spec worker(pid(), maybe({string(), binary()})) -> none().
+-spec worker(pid(), maybe({string(), binary()})) -> no_return().
-worker(Parent, State) ->
+worker(Parent, Contents) ->
receive
{write_behind, FileName, Binary} ->
ok = file:write_file(FileName, Binary),
- worker(Parent, State);
+ worker(Parent, Contents);
{read_ahead, FileName} ->
{ok, Binary} = file:read_file(FileName),
ok = file:delete(FileName),
worker(Parent, {just, {FileName, Binary}});
{read, FileName} ->
- {just, {FileName, Binary}} = State,
- Parent ! {binary, Binary},
+ {just, {FileName, Binary}} = Contents,
+ (Parent ! {binary, Binary}),
worker(Parent, nothing)
end.
diff --git a/src/rabbit_mnesia_queue.erl b/src/rabbit_mnesia_queue.erl
index 4a4040fa..9583f0e3 100644
--- a/src/rabbit_mnesia_queue.erl
+++ b/src/rabbit_mnesia_queue.erl
@@ -189,8 +189,8 @@ stop() -> ok.
init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) ->
{QTable, PTable} = tables(QueueName),
case Recover of
- false -> _ = mnesia:delete_table(QTable),
- _ = mnesia:delete_table(PTable);
+ false -> {atomic, ok} = mnesia:delete_table(QTable),
+ {atomic, ok} = mnesia:delete_table(PTable);
true -> ok
end,
create_table(QTable, 'q_record', 'ordered_set', record_info(fields,
@@ -211,7 +211,7 @@ init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) ->
#state { q_table = QTable,
p_table = PTable,
next_seq_id = NextSeqId,
- confirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
txn_dict = dict:new() }
end),
State.
@@ -226,8 +226,8 @@ init(QueueName, IsDurable, Recover, _AsyncCallback, _SyncCallback) ->
%% -spec(terminate/1 :: (state()) -> state()).
terminate(State = #state { q_table = QTable, p_table = PTable }) ->
- {atomic, _} = mnesia:clear_table(PTable),
- mnesia:dump_tables([QTable, PTable]),
+ {atomic, ok} = mnesia:clear_table(PTable),
+ {atomic, ok} = mnesia:dump_tables([QTable, PTable]),
State.
%% ----------------------------------------------------------------------------
@@ -245,7 +245,7 @@ delete_and_terminate(State = #state { q_table = QTable, p_table = PTable }) ->
mnesia:transaction(fun () -> clear_table(QTable),
clear_table(PTable)
end),
- mnesia:dump_tables([QTable, PTable]),
+ {atomic, ok} = mnesia:dump_tables([QTable, PTable]),
State.
%% ----------------------------------------------------------------------------
@@ -301,9 +301,9 @@ publish(Msg, Props, State) ->
publish_delivered(false, Msg, Props, State) ->
{undefined, confirm([{Msg, Props}], State)};
publish_delivered(true,
- Msg,
- Props,
- State = #state { next_seq_id = SeqId }) ->
+ Msg,
+ Props,
+ State = #state { next_seq_id = SeqId }) ->
MsgStatus = #msg_status { seq_id = SeqId,
msg = Msg,
props = Props,
@@ -354,9 +354,9 @@ dropwhile(Pred, State) ->
%% (false, state()) -> {fetch_result(undefined), state()}).
fetch(AckRequired, State) ->
- {atomic, Result} =
+ {atomic, FetchResult} =
mnesia:transaction(fun () -> internal_fetch(AckRequired, State) end),
- Result.
+ {FetchResult, State}.
%% ----------------------------------------------------------------------------
%% ack/2 acknowledges msgs named by SeqIds.
@@ -424,12 +424,15 @@ tx_rollback(Txn, State = #state { txn_dict = TxnDict }) ->
tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) ->
#tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict),
+ {atomic, State1} = mnesia:transaction(
+ fun () ->
+ internal_tx_commit(
+ Pubs,
+ SeqIds,
+ PropsF,
+ State #state { txn_dict = erase_tx(Txn, TxnDict) })
+ end),
F(),
- State1 = internal_tx_commit(
- Pubs,
- SeqIds,
- PropsF,
- State #state { txn_dict = erase_tx(Txn, TxnDict) }),
{SeqIds, confirm(Pubs, State1)}.
%% ----------------------------------------------------------------------------
@@ -578,7 +581,7 @@ create_table(Table, RecordName, Type, Attributes) ->
clear_table(Table) ->
case mnesia:first(Table) of
'$end_of_table' -> ok;
- Key -> mnesia:delete(Table, Key, 'write'),
+ Key -> ok = mnesia:delete(Table, Key, 'write'),
clear_table(Table)
end.
@@ -594,7 +597,7 @@ delete_nonpersistent_msgs(QTable) ->
case MsgStatus of
#msg_status { msg = #basic_message {
is_persistent = true }} -> ok;
- _ -> mnesia:delete(QTable, Key, 'write')
+ _ -> ok = mnesia:delete(QTable, Key, 'write')
end
end,
mnesia:all_keys(QTable)).
@@ -641,8 +644,10 @@ internal_publish(Msg,
msg = Msg,
props = Props,
is_delivered = IsDelivered },
- mnesia:write(
- QTable, #q_record { seq_id = SeqId, msg_status = MsgStatus }, 'write'),
+ ok = mnesia:write(
+ QTable,
+ #q_record { seq_id = SeqId, msg_status = MsgStatus },
+ 'write'),
State #state { next_seq_id = SeqId + 1 }.
-spec(internal_ack/2 :: ([seq_id()], state()) -> state()).
@@ -675,7 +680,7 @@ q_pop(#state { q_table = QTable }) ->
'$end_of_table' -> nothing;
SeqId -> [#q_record { seq_id = SeqId, msg_status = MsgStatus }] =
mnesia:read(QTable, SeqId, 'read'),
- mnesia:delete(QTable, SeqId, 'write'),
+ ok = mnesia:delete(QTable, SeqId, 'write'),
{just, MsgStatus}
end.
@@ -694,9 +699,8 @@ q_peek(#state { q_table = QTable }) ->
%% post_pop operates after q_pop, calling add_pending_ack if necessary.
--spec(post_pop(true, msg_status(), state()) -> {fetch_result(ack()), state()};
- (false, msg_status(), state()) ->
- {fetch_result(undefined), state()}).
+-spec(post_pop(true, msg_status(), state()) -> fetch_result(ack());
+ (false, msg_status(), state()) -> fetch_result(undefined)).
post_pop(true,
MsgStatus = #msg_status {
@@ -716,10 +720,10 @@ post_pop(false,
-spec add_pending_ack(msg_status(), state()) -> ok.
add_pending_ack(MsgStatus = #msg_status { seq_id = SeqId },
- #state { p_table = PTable }) ->
- mnesia:write(PTable,
- #p_record { seq_id = SeqId, msg_status = MsgStatus },
- 'write'),
+ #state { p_table = PTable }) ->
+ ok = mnesia:write(PTable,
+ #p_record { seq_id = SeqId, msg_status = MsgStatus },
+ 'write'),
ok.
%% del_pending_acks deletes some set of pending acks from the P table
@@ -727,16 +731,16 @@ add_pending_ack(MsgStatus = #msg_status { seq_id = SeqId },
%% msg is deleted.
-spec del_pending_acks(fun ((msg_status(), state()) -> state()),
- [seq_id()],
- state()) ->
- state().
+ [seq_id()],
+ state()) ->
+ state().
del_pending_acks(F, SeqIds, State = #state { p_table = PTable }) ->
lists:foldl(
fun (SeqId, S) ->
[#p_record { msg_status = MsgStatus }] =
mnesia:read(PTable, SeqId, 'read'),
- mnesia:delete(PTable, SeqId, 'write'),
+ ok = mnesia:delete(PTable, SeqId, 'write'),
F(MsgStatus, S)
end,
State,
diff --git a/src/rabbit_ram_queue.erl b/src/rabbit_ram_queue.erl
index 6f8cc9c2..4483318b 100644
--- a/src/rabbit_ram_queue.erl
+++ b/src/rabbit_ram_queue.erl
@@ -155,7 +155,7 @@ init(_QueueName, _IsDurable, _Recover, _asyncCallback, _SyncCallback) ->
q_len = 0,
pending_acks = dict:new(),
next_seq_id = 0,
- confirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
txn_dict = dict:new() }.
%% ----------------------------------------------------------------------------
@@ -495,7 +495,7 @@ internal_dropwhile(Pred, State = #state { q = Q, q_len = QLen }) ->
post_pop(true,
MsgStatus = #msg_status {
- seq_id = SeqId, msg = Msg, is_delivered = IsDelivered },
+ seq_id = SeqId, msg = Msg, is_delivered = IsDelivered },
State = #state { q_len = QLen, pending_acks = PendingAcks }) ->
MsgStatus1 = MsgStatus #msg_status { is_delivered = true },
{{Msg, IsDelivered, SeqId, QLen},