summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 13:33:38 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 13:33:38 +0100
commit67af217a39218ee8cf5e344786e8166c821adcdb (patch)
treec00803400337f72c2d798682de6e160e94847019
parentbf5ba32265dea67d4191937edf1ed6cc524089ab (diff)
downloadrabbitmq-server-67af217a39218ee8cf5e344786e8166c821adcdb.tar.gz
More tidying
-rw-r--r--src/rabbit_amqqueue_process.erl26
-rw-r--r--src/rabbit_disk_queue.erl2
-rw-r--r--src/rabbit_mixed_queue.erl8
-rw-r--r--src/rabbit_tests.erl6
4 files changed, 19 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e2a99d19..620b497b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -96,7 +96,7 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
{ok, Mode} = rabbit_queue_mode_manager:register(self()),
- {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode),
+ {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -461,20 +461,17 @@ commit_transaction(Txn, State) ->
} = lookup_tx(Txn),
PendingMessagesOrdered = lists:reverse(PendingMessages),
PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)),
- {ok, MS} =
+ Acks =
case lookup_ch(ChPid) of
- not_found ->
- rabbit_mixed_queue:tx_commit(
- PendingMessagesOrdered, [], State #q.mixed_state);
+ not_found -> [];
C = #cr { unacked_messages = UAM } ->
{MsgWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- rabbit_mixed_queue:tx_commit(
- PendingMessagesOrdered,
- lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
- State #q.mixed_state)
+ [ AckTag || {_Msg, AckTag} <- MsgWithAcks ]
end,
+ {ok, MS} = rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered, Acks, State #q.mixed_state),
State #q { mixed_state = MS }.
rollback_transaction(Txn, State) ->
@@ -736,8 +733,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
- Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end,
- MsgWithAcks),
+ Acks = [ AckTag || {_Msg, AckTag} <- MsgWithAcks ],
{ok, MS} =
rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
@@ -792,10 +788,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end));
handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
- {ok, MS2} = case Constrain of
- true -> rabbit_mixed_queue:to_disk_only_mode(MS);
- false -> rabbit_mixed_queue:to_mixed_mode(MS)
- end,
+ {ok, MS2} = (case Constrain of
+ true -> fun rabbit_mixed_queue:to_disk_only_mode/1;
+ false -> fun rabbit_mixed_queue:to_mixed_mode/1
+ end)(MS),
noreply(State #q { mixed_state = MS2 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index f3e63127..e82feb99 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1413,6 +1413,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
{Q, SeqId, NextWrite, -1});
[Orig = {Q, Read, Write, Length}] ->
Repl = {Q, lists:min([Read, SeqId]),
+ lists:max([Write, NextWrite]),
%% Length is wrong here,
%% but it doesn't matter
%% because we'll pull out
@@ -1421,7 +1422,6 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
%% in then do a straight
%% subtraction to get the
%% right length
- lists:max([Write, NextWrite]),
Length},
if Orig =:= Repl -> true;
true -> ets:insert(Sequences, Repl)
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 5933357c..a2e01bda 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
--export([start_link/3]).
+-export([init/3]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
@@ -49,12 +49,12 @@
}
).
-start_link(Queue, IsDurable, disk) ->
+init(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
is_durable = IsDurable, length = 0 });
-start_link(Queue, IsDurable, mixed) ->
- {ok, State} = start_link(Queue, IsDurable, disk),
+init(Queue, IsDurable, mixed) ->
+ {ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode(State).
to_disk_only_mode(State = #mqstate { mode = disk }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 62d5c03a..f45a36bb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -954,7 +954,7 @@ rdq_test_mixed_queue_modes() ->
rdq_virgin(),
rdq_start(),
Payload = <<0:(8*256)>>,
- {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed),
+ {ok, MS} = rabbit_mixed_queue:init(q, true, mixed),
MS2 = lists:foldl(
fun (_N, MS1) ->
Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
@@ -998,7 +998,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed),
+ {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed),
10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
@@ -1018,7 +1018,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed),
+ {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed),
0 = rabbit_mixed_queue:length(MS17),
io:format("Recovered queue~n"),
rdq_stop(),