diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-07 20:20:56 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-07 20:20:56 +0100 |
commit | eab942babba136033bbc41232b7d390aea8088c1 (patch) | |
tree | 5c82fdd4677c58248e04a2ba6e594c128c3d1337 | |
parent | 985972a1d3fba56a9846a1674cfdbe5f7980b008 (diff) | |
parent | 178c6bc0318d1a471f4a7eb6b29e7e7e63098996 (diff) | |
download | rabbitmq-server-eab942babba136033bbc41232b7d390aea8088c1.tar.gz |
merge default into bug21377
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 17 | ||||
-rw-r--r-- | include/rabbit_msg_store_index.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 27 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 26 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 175 | ||||
-rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 6 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 5 |
8 files changed, 144 insertions, 121 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 005994f0..38c6f939 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -30,8 +30,9 @@ %% -type(fetch_result() :: - %% Message, IsDelivered, AckTag, Remaining_Len - ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). + ('empty' | + %% Message, IsDelivered, AckTag, RemainingLen + {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). @@ -39,19 +40,23 @@ -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()). +-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> + state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). -spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}). + (ack_required(), rabbit_types:basic_message(), state()) -> + {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). +-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), + state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}). +-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> + {[ack()], state()}). -spec(requeue/2 :: ([ack()], state()) -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index fba0b7cd..d4115363 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -51,6 +51,7 @@ [{fieldpos(), fieldvalue()}]), index_state()) -> 'ok'). -spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok'). +-spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok'). -spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). -spec(terminate/1 :: (index_state()) -> any()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fe67ea9b..7dfe41ba 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -437,24 +437,21 @@ flush_all(QPids, ChPid) -> internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), - %% we want to execute some things, as - %% decided by rabbit_exchange, after the - %% transaction. + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - case - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) - end - end) of - Err = {error, _} -> Err; - PostHook -> - PostHook(), - ok + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> internal_delete1(QueueName) + end + end) of + {error, _} = Err -> Err; + PostHook -> PostHook(), + ok end. maybe_run_queue_via_backing_queue(QPid, Fun) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2c53a8e3..61204deb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -251,8 +251,9 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{expiry_timer_ref = undefined}. -%% We only wish to expire where there are no consumers *and* when -%% basic.get hasn't been called for the configured period. +%% We wish to expire only when there are no consumers *and* the expiry +%% hasn't been refreshed (by queue.declare or basic.get) for the +%% configured period. ensure_expiry_timer(State = #q{expires = undefined}) -> State; ensure_expiry_timer(State = #q{expires = Expires}) -> @@ -783,7 +784,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, + ensure_expiry_timer(State)); handle_call(delete_exclusive, _From, State = #q{ backing_queue_state = BQS, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d62fc07c..38412982 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -55,28 +55,24 @@ rabbit_types:message()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) - -> (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> + (rabbit_types:message() | rabbit_types:error(any()))). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) - -> publish_result()). + properties_input(), binary()) -> publish_result()). -spec(publish/7 :: (rabbit_exchange:name(), rabbit_router:routing_key(), boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - properties_input(), binary()) - -> publish_result()). --spec(build_content/2 :: - (rabbit_framing:amqp_property_record(), binary()) - -> rabbit_types:content()). --spec(from_content/1 :: - (rabbit_types:content()) - -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: - (rabbit_types:decoded_content()) - -> (boolean() | {'invalid', non_neg_integer()})). + properties_input(), binary()) -> publish_result()). +-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> + rabbit_types:content()). +-spec(from_content/1 :: (rabbit_types:content()) -> + {rabbit_framing:amqp_property_record(), binary()}). +-spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> + (boolean() | + {'invalid', non_neg_integer()})). -endif. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bbecbfe2..81d3c501 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -325,7 +325,7 @@ read(Server, Guid, Defer = fun() -> {gen_server2:call( Server, {read, Guid}, infinity), CState} end, - case index_lookup(Guid, CState) of + case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); MsgLocation -> client_read1(Server, MsgLocation, Defer, CState) @@ -620,45 +620,31 @@ handle_call(client_terminate, _From, State) -> reply(ok, State). handle_cast({write, Guid}, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> - %% New message, lots to do - {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), - ok = index_insert(#msg_location { - guid = Guid, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize }, - State), - [#file_summary { valid_total_size = ValidTotalSize, - right = undefined, - locked = false, - file_size = FileSize }] = - ets:lookup(FileSummaryEts, CurFile), - ValidTotalSize1 = ValidTotalSize + TotalSize, - true = ets:update_element( - FileSummaryEts, CurFile, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.file_size, FileSize + TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + write_message(Guid, Msg, State); + #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + ok = index_delete(Guid, State), + write_message(Guid, Msg, State); + [#file_summary {}] -> + ok = index_update_ref_count(Guid, 1, State), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, TotalSize}]), + noreply(State #msstate { + sum_valid_data = SumValid + TotalSize }) + end; #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_fields(Guid, - {#msg_location.ref_count, RefCount + 1}, - State), + ok = index_update_ref_count(Guid, RefCount + 1, State), noreply(State) end; @@ -812,9 +798,31 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +write_message(Guid, Msg, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts }) -> + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + ok = index_insert( + #msg_location { guid = Guid, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize }, State), + [#file_summary { right = undefined, locked = false }] = + ets:lookup(FileSummaryEts, CurFile), + [_,_] = ets:update_counter(FileSummaryEts, CurFile, + [{#file_summary.valid_total_size, TotalSize}, + {#file_summary.file_size, TotalSize}]), + NextOffset = CurOffset + TotalSize, + noreply(maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize })). + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> - case index_lookup(Guid, State) of + case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, not_found), State; @@ -887,7 +895,7 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, {Msg, State1}. contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> - case index_lookup(Guid, State) of + case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; @@ -906,36 +914,30 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = index_lookup(Guid, State), + total_size = TotalSize } = + index_lookup_positive_ref_count(Guid, State), + %% only update field, otherwise bad interaction with concurrent GC + Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, case RefCount of - 1 -> - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - ok = remove_cache_entry(DedupCacheEts, Guid), - [#file_summary { valid_total_size = ValidTotalSize, - locked = Locked }] = - ets:lookup(FileSummaryEts, File), - case Locked of - true -> - add_to_pending_gc_completion({remove, Guid}, State); - false -> - ok = index_delete(Guid, State), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = - ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}]), - State1 = delete_file_if_empty(File, State), - State1 #msstate { sum_valid_data = SumValid - TotalSize } - end; - _ when 1 < RefCount -> - ok = decrement_cache(DedupCacheEts, Guid), - %% only update field, otherwise bad interaction with concurrent GC - ok = index_update_fields(Guid, - {#msg_location.ref_count, RefCount - 1}, - State), - State + %% don't remove from CUR_FILE_CACHE_ETS_NAME here because + %% there may be further writes in the mailbox for the same + %% msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true } ] -> + add_to_pending_gc_completion({remove, Guid}, State); + [#file_summary {}] -> + ok = Dec(), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, -TotalSize}]), + delete_file_if_empty( + File, State #msstate { + sum_valid_data = SumValid - TotalSize }) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), + ok = Dec(), + State end. add_to_pending_gc_completion( @@ -1106,6 +1108,16 @@ decrement_cache(DedupCacheEts, Guid) -> %% index %%---------------------------------------------------------------------------- +index_lookup_positive_ref_count(Key, State) -> + case index_lookup(Key, State) of + not_found -> not_found; + #msg_location { ref_count = 0 } -> not_found; + #msg_location {} = MsgLocation -> MsgLocation + end. + +index_update_ref_count(Key, RefCount, State) -> + index_update_fields(Key, {#msg_location.ref_count, RefCount}, State). + index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> Index:lookup(Key, State); @@ -1498,6 +1510,10 @@ delete_file_if_empty(File, State = #msstate { end, true = mark_handle_to_close(FileHandlesEts, File), true = ets:delete(FileSummaryEts, File), + {ok, Messages, FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + [index_delete(Guid, State) || + {Guid, _TotalSize, _Offset} <- Messages], State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; @@ -1553,7 +1569,7 @@ combine_files(#file_summary { file = Source, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), + load_and_vacuum_message_file(Destination, State), {DestinationContiguousTop, DestinationWorkListTail} = drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of @@ -1579,8 +1595,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:sync(DestinationHdl), ok = file_handle_cache:delete(TmpHdl) end, - {SourceWorkList, SourceValid} = - find_unremoved_messages_in_file(Source, State), + {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1588,21 +1603,25 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(SourceHdl), ExpectedSize. -find_unremoved_messages_in_file(File, - {_FileSummaryEts, Dir, Index, IndexState}) -> +load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> - case Index:lookup(Guid, IndexState) of - #msg_location { file = File, total_size = TotalSize, - offset = Offset } = Entry -> - {[ Entry | List ], TotalSize + Size}; - _ -> - Acc - end - end, {[], 0}, Messages). + lists:foldl( + fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> + case Index:lookup(Guid, IndexState) of + #msg_location { file = File, total_size = TotalSize, + offset = Offset, ref_count = 0 } = Entry -> + ok = Index:delete_object(Entry, IndexState), + Acc; + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> + {[ Entry | List ], TotalSize + Size}; + _ -> + Acc + end + end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 1eb3c11f..96be674c 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -35,7 +35,7 @@ -export([new/1, recover/1, lookup/2, insert/2, update/2, update_fields/3, delete/2, - delete_by_file/2, terminate/1]). + delete_object/2, delete_by_file/2, terminate/1]). -define(MSG_LOC_NAME, rabbit_msg_store_ets_index). -define(FILENAME, "msg_store_index.ets"). @@ -79,6 +79,10 @@ delete(Key, State) -> true = ets:delete(State #state.table, Key), ok. +delete_object(Obj, State) -> + true = ets:delete_object(State #state.table, Obj), + ok. + delete_by_file(File, State) -> MatchHead = #msg_location { file = File, _ = '_' }, ets:select_delete(State #state.table, [{MatchHead, [], [true]}]), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 067fa9f2..9eb9d0a6 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -47,7 +47,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([update/0, get_total_memory/0, +-export([update/0, get_total_memory/0, get_vm_limit/0, get_check_interval/0, set_check_interval/1, get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, get_memory_limit/0]). @@ -76,7 +76,7 @@ -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). --spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). +-spec(get_memory_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -84,7 +84,6 @@ -endif. - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- |