diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 13:33:38 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 13:33:38 +0100 |
commit | 67af217a39218ee8cf5e344786e8166c821adcdb (patch) | |
tree | c00803400337f72c2d798682de6e160e94847019 | |
parent | bf5ba32265dea67d4191937edf1ed6cc524089ab (diff) | |
download | rabbitmq-server-67af217a39218ee8cf5e344786e8166c821adcdb.tar.gz |
More tidying
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 26 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 |
4 files changed, 19 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e2a99d19..620b497b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -96,7 +96,7 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), {ok, Mode} = rabbit_queue_mode_manager:register(self()), - {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode), + {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -461,20 +461,17 @@ commit_transaction(Txn, State) -> } = lookup_tx(Txn), PendingMessagesOrdered = lists:reverse(PendingMessages), PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)), - {ok, MS} = + Acks = case lookup_ch(ChPid) of - not_found -> - rabbit_mixed_queue:tx_commit( - PendingMessagesOrdered, [], State #q.mixed_state); + not_found -> []; C = #cr { unacked_messages = UAM } -> {MsgWithAcks, Remaining} = collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), - rabbit_mixed_queue:tx_commit( - PendingMessagesOrdered, - lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), - State #q.mixed_state) + [ AckTag || {_Msg, AckTag} <- MsgWithAcks ] end, + {ok, MS} = rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, Acks, State #q.mixed_state), State #q { mixed_state = MS }. rollback_transaction(Txn, State) -> @@ -736,8 +733,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> - Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, - MsgWithAcks), + Acks = [ AckTag || {_Msg, AckTag} <- MsgWithAcks ], {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), @@ -792,10 +788,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end)); handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) -> - {ok, MS2} = case Constrain of - true -> rabbit_mixed_queue:to_disk_only_mode(MS); - false -> rabbit_mixed_queue:to_mixed_mode(MS) - end, + {ok, MS2} = (case Constrain of + true -> fun rabbit_mixed_queue:to_disk_only_mode/1; + false -> fun rabbit_mixed_queue:to_mixed_mode/1 + end)(MS), noreply(State #q { mixed_state = MS2 }). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index f3e63127..e82feb99 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1413,6 +1413,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> {Q, SeqId, NextWrite, -1}); [Orig = {Q, Read, Write, Length}] -> Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite]), %% Length is wrong here, %% but it doesn't matter %% because we'll pull out @@ -1421,7 +1422,6 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> %% in then do a straight %% subtraction to get the %% right length - lists:max([Write, NextWrite]), Length}, if Orig =:= Repl -> true; true -> ets:insert(Sequences, Repl) diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 5933357c..a2e01bda 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). --export([start_link/3]). +-export([init/3]). -export([publish/2, publish_delivered/2, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, @@ -49,12 +49,12 @@ } ). -start_link(Queue, IsDurable, disk) -> +init(Queue, IsDurable, disk) -> purge_non_persistent_messages( #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, is_durable = IsDurable, length = 0 }); -start_link(Queue, IsDurable, mixed) -> - {ok, State} = start_link(Queue, IsDurable, disk), +init(Queue, IsDurable, mixed) -> + {ok, State} = init(Queue, IsDurable, disk), to_mixed_mode(State). to_disk_only_mode(State = #mqstate { mode = disk }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 62d5c03a..f45a36bb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -954,7 +954,7 @@ rdq_test_mixed_queue_modes() -> rdq_virgin(), rdq_start(), Payload = <<0:(8*256)>>, - {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed), + {ok, MS} = rabbit_mixed_queue:init(q, true, mixed), MS2 = lists:foldl( fun (_N, MS1) -> Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), @@ -998,7 +998,7 @@ rdq_test_mixed_queue_modes() -> io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), - {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed), + {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed), 10 = rabbit_mixed_queue:length(MS12), io:format("Recovered queue~n"), {MS14, AckTags} = @@ -1018,7 +1018,7 @@ rdq_test_mixed_queue_modes() -> io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), - {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed), + {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed), 0 = rabbit_mixed_queue:length(MS17), io:format("Recovered queue~n"), rdq_stop(), |