diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-01 17:40:59 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-01 17:40:59 +0000 |
commit | e0fcc125297062763771da0e5b6208307d7fa62f (patch) | |
tree | 5f643c0c368a22e4782977f892f364b1409e0739 | |
parent | 2ae51cb5ef9251d63f8864ad606b6630141c64c3 (diff) | |
download | rabbitmq-server-e0fcc125297062763771da0e5b6208307d7fa62f.tar.gz |
Cosmetic, restored reject/4 arguments to the previous order.
-rw-r--r-- | src/file_handle_cache.erl | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 15 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 4 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
4 files changed, 40 insertions, 20 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c11fb54b..bbf50d32 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -149,6 +149,7 @@ -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). +-export([needs_sync/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). @@ -373,6 +374,13 @@ sync(Ref) -> end end). +needs_sync(Ref) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false; + (_) -> true + end). + position(Ref, NewOffset) -> with_flushed_handles( [Ref], diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d6c9a51c..50e9e49a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -679,7 +679,7 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, requeue = Requeue}, _, State) -> - reject(DeliveryTag, Multiple, Requeue, State); + reject(DeliveryTag, Requeue, Multiple, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -880,7 +880,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> - reject(DeliveryTag, false, Requeue, State); + reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1084,10 +1084,11 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL, - limiter = Limiter}) -> +handle_method(#'tx.commit'{}, _, + State = #ch{uncommitted_message_q = TMQ, + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), ack(TAL, State1), lists:foreach( @@ -1266,7 +1267,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Multiple, Requeue, +reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f03c1d1c..434f28d4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -22,6 +22,7 @@ next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). +-export([needs_sync/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -298,6 +299,9 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). +needs_sync(#qistate { journal_handle = JournalHdl }) -> + file_handle_cache:needs_sync(JournalHdl). + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9b45b558..8407bebf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -731,21 +731,28 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State) -> - case needs_index_sync(State) of - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - true -> timed +needs_timeout(State = #vqstate { index_state = IndexState }) -> + case rabbit_queue_index:needs_sync(IndexState) of + true -> + timed; + false -> + case needs_index_sync(State) of + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; + true -> timed + end end. -timeout(State) -> - a(reduce_memory_use(confirm_commit_index(State))). +timeout(State = #vqstate { index_state = IndexState }) -> + State1 = State #vqstate { + index_state = rabbit_queue_index:sync(IndexState) }, + a(reduce_memory_use(State1)). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. |