summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-07 20:20:56 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-07 20:20:56 +0100
commiteab942babba136033bbc41232b7d390aea8088c1 (patch)
tree5c82fdd4677c58248e04a2ba6e594c128c3d1337
parent985972a1d3fba56a9846a1674cfdbe5f7980b008 (diff)
parent178c6bc0318d1a471f4a7eb6b29e7e7e63098996 (diff)
downloadrabbitmq-server-eab942babba136033bbc41232b7d390aea8088c1.tar.gz
merge default into bug21377
-rw-r--r--include/rabbit_backing_queue_spec.hrl17
-rw-r--r--include/rabbit_msg_store_index.hrl1
-rw-r--r--src/rabbit_amqqueue.erl27
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_basic.erl26
-rw-r--r--src/rabbit_msg_store.erl175
-rw-r--r--src/rabbit_msg_store_ets_index.erl6
-rw-r--r--src/vm_memory_monitor.erl5
8 files changed, 144 insertions, 121 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 005994f0..38c6f939 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -30,8 +30,9 @@
%%
-type(fetch_result() ::
- %% Message, IsDelivered, AckTag, Remaining_Len
- ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+ ('empty' |
+ %% Message, IsDelivered, AckTag, RemainingLen
+ {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
@@ -39,19 +40,23 @@
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()).
+-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
+ state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
-spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
+ (ack_required(), rabbit_types:basic_message(), state()) ->
+ {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
+-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) ->
+ {[ack()], state()}).
-spec(requeue/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index fba0b7cd..d4115363 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -51,6 +51,7 @@
[{fieldpos(), fieldvalue()}]),
index_state()) -> 'ok').
-spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok').
+-spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok').
-spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
-spec(terminate/1 :: (index_state()) -> any()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fe67ea9b..7dfe41ba 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -437,24 +437,21 @@ flush_all(QPids, ChPid) ->
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
+ %% we want to execute some things, as decided by rabbit_exchange,
+ %% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- case
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
- end
- end) of
- Err = {error, _} -> Err;
- PostHook ->
- PostHook(),
- ok
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [_] -> internal_delete1(QueueName)
+ end
+ end) of
+ {error, _} = Err -> Err;
+ PostHook -> PostHook(),
+ ok
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2c53a8e3..61204deb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -251,8 +251,9 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{expiry_timer_ref = undefined}.
-%% We only wish to expire where there are no consumers *and* when
-%% basic.get hasn't been called for the configured period.
+%% We wish to expire only when there are no consumers *and* the expiry
+%% hasn't been refreshed (by queue.declare or basic.get) for the
+%% configured period.
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires}) ->
@@ -783,7 +784,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
+ ensure_expiry_timer(State));
handle_call(delete_exclusive, _From,
State = #q{ backing_queue_state = BQS,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d62fc07c..38412982 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -55,28 +55,24 @@
rabbit_types:message()) -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) ->
+ (rabbit_types:message() | rabbit_types:error(any()))).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> publish_result()).
+ properties_input(), binary()) -> publish_result()).
-spec(publish/7 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), binary())
- -> publish_result()).
--spec(build_content/2 ::
- (rabbit_framing:amqp_property_record(), binary())
- -> rabbit_types:content()).
--spec(from_content/1 ::
- (rabbit_types:content())
- -> {rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 ::
- (rabbit_types:decoded_content())
- -> (boolean() | {'invalid', non_neg_integer()})).
+ properties_input(), binary()) -> publish_result()).
+-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
+ rabbit_types:content()).
+-spec(from_content/1 :: (rabbit_types:content()) ->
+ {rabbit_framing:amqp_property_record(), binary()}).
+-spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
+ (boolean() |
+ {'invalid', non_neg_integer()})).
-endif.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bbecbfe2..81d3c501 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -325,7 +325,7 @@ read(Server, Guid,
Defer = fun() -> {gen_server2:call(
Server, {read, Guid}, infinity),
CState} end,
- case index_lookup(Guid, CState) of
+ case index_lookup_positive_ref_count(Guid, CState) of
not_found -> Defer();
MsgLocation -> client_read1(Server, MsgLocation, Defer,
CState)
@@ -620,45 +620,31 @@ handle_call(client_terminate, _From, State) ->
reply(ok, State).
handle_cast({write, Guid},
- State = #msstate { current_file_handle = CurHdl,
- current_file = CurFile,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
not_found ->
- %% New message, lots to do
- {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
- ok = index_insert(#msg_location {
- guid = Guid, ref_count = 1, file = CurFile,
- offset = CurOffset, total_size = TotalSize },
- State),
- [#file_summary { valid_total_size = ValidTotalSize,
- right = undefined,
- locked = false,
- file_size = FileSize }] =
- ets:lookup(FileSummaryEts, CurFile),
- ValidTotalSize1 = ValidTotalSize + TotalSize,
- true = ets:update_element(
- FileSummaryEts, CurFile,
- [{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.file_size, FileSize + TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(
- maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
+ write_message(Guid, Msg, State);
+ #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ ok = index_delete(Guid, State),
+ write_message(Guid, Msg, State);
+ [#file_summary {}] ->
+ ok = index_update_ref_count(Guid, 1, State),
+ [_] = ets:update_counter(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, TotalSize}]),
+ noreply(State #msstate {
+ sum_valid_data = SumValid + TotalSize })
+ end;
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_fields(Guid,
- {#msg_location.ref_count, RefCount + 1},
- State),
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
noreply(State)
end;
@@ -812,9 +798,31 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
+write_message(Guid, Msg,
+ State = #msstate { current_file_handle = CurHdl,
+ current_file = CurFile,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ file_summary_ets = FileSummaryEts }) ->
+ {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ ok = index_insert(
+ #msg_location { guid = Guid, ref_count = 1, file = CurFile,
+ offset = CurOffset, total_size = TotalSize }, State),
+ [#file_summary { right = undefined, locked = false }] =
+ ets:lookup(FileSummaryEts, CurFile),
+ [_,_] = ets:update_counter(FileSummaryEts, CurFile,
+ [{#file_summary.valid_total_size, TotalSize},
+ {#file_summary.file_size, TotalSize}]),
+ NextOffset = CurOffset + TotalSize,
+ noreply(maybe_roll_to_new_file(
+ NextOffset, State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize })).
+
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
- case index_lookup(Guid, State) of
+ case index_lookup_positive_ref_count(Guid, State) of
not_found ->
gen_server2:reply(From, not_found),
State;
@@ -887,7 +895,7 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
{Msg, State1}.
contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
- case index_lookup(Guid, State) of
+ case index_lookup_positive_ref_count(Guid, State) of
not_found ->
gen_server2:reply(From, false),
State;
@@ -906,36 +914,30 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } = index_lookup(Guid, State),
+ total_size = TotalSize } =
+ index_lookup_positive_ref_count(Guid, State),
+ %% only update field, otherwise bad interaction with concurrent GC
+ Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
case RefCount of
- 1 ->
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- ok = remove_cache_entry(DedupCacheEts, Guid),
- [#file_summary { valid_total_size = ValidTotalSize,
- locked = Locked }] =
- ets:lookup(FileSummaryEts, File),
- case Locked of
- true ->
- add_to_pending_gc_completion({remove, Guid}, State);
- false ->
- ok = index_delete(Guid, State),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true =
- ets:update_element(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, ValidTotalSize1}]),
- State1 = delete_file_if_empty(File, State),
- State1 #msstate { sum_valid_data = SumValid - TotalSize }
- end;
- _ when 1 < RefCount ->
- ok = decrement_cache(DedupCacheEts, Guid),
- %% only update field, otherwise bad interaction with concurrent GC
- ok = index_update_fields(Guid,
- {#msg_location.ref_count, RefCount - 1},
- State),
- State
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
+ %% there may be further writes in the mailbox for the same
+ %% msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true } ] ->
+ add_to_pending_gc_completion({remove, Guid}, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ [_] = ets:update_counter(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, -TotalSize}]),
+ delete_file_if_empty(
+ File, State #msstate {
+ sum_valid_data = SumValid - TotalSize })
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
+ ok = Dec(),
+ State
end.
add_to_pending_gc_completion(
@@ -1106,6 +1108,16 @@ decrement_cache(DedupCacheEts, Guid) ->
%% index
%%----------------------------------------------------------------------------
+index_lookup_positive_ref_count(Key, State) ->
+ case index_lookup(Key, State) of
+ not_found -> not_found;
+ #msg_location { ref_count = 0 } -> not_found;
+ #msg_location {} = MsgLocation -> MsgLocation
+ end.
+
+index_update_ref_count(Key, RefCount, State) ->
+ index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
+
index_lookup(Key, #client_msstate { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
@@ -1498,6 +1510,10 @@ delete_file_if_empty(File, State = #msstate {
end,
true = mark_handle_to_close(FileHandlesEts, File),
true = ets:delete(FileSummaryEts, File),
+ {ok, Messages, FileSize} =
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ [index_delete(Guid, State) ||
+ {Guid, _TotalSize, _Offset} <- Messages],
State1 = close_handle(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
@@ -1553,7 +1569,7 @@ combine_files(#file_summary { file = Source,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
{DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
+ load_and_vacuum_message_file(Destination, State),
{DestinationContiguousTop, DestinationWorkListTail} =
drop_contiguous_block_prefix(DestinationWorkList),
case DestinationWorkListTail of
@@ -1579,8 +1595,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:sync(DestinationHdl),
ok = file_handle_cache:delete(TmpHdl)
end,
- {SourceWorkList, SourceValid} =
- find_unremoved_messages_in_file(Source, State),
+ {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1588,21 +1603,25 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(SourceHdl),
ExpectedSize.
-find_unremoved_messages_in_file(File,
- {_FileSummaryEts, Dir, Index, IndexState}) ->
+load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
- case Index:lookup(Guid, IndexState) of
- #msg_location { file = File, total_size = TotalSize,
- offset = Offset } = Entry ->
- {[ Entry | List ], TotalSize + Size};
- _ ->
- Acc
- end
- end, {[], 0}, Messages).
+ lists:foldl(
+ fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
+ case Index:lookup(Guid, IndexState) of
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset, ref_count = 0 } = Entry ->
+ ok = Index:delete_object(Entry, IndexState),
+ Acc;
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
+ {[ Entry | List ], TotalSize + Size};
+ _ ->
+ Acc
+ end
+ end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 1eb3c11f..96be674c 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -35,7 +35,7 @@
-export([new/1, recover/1,
lookup/2, insert/2, update/2, update_fields/3, delete/2,
- delete_by_file/2, terminate/1]).
+ delete_object/2, delete_by_file/2, terminate/1]).
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
-define(FILENAME, "msg_store_index.ets").
@@ -79,6 +79,10 @@ delete(Key, State) ->
true = ets:delete(State #state.table, Key),
ok.
+delete_object(Obj, State) ->
+ true = ets:delete_object(State #state.table, Obj),
+ ok.
+
delete_by_file(File, State) ->
MatchHead = #msg_location { file = File, _ = '_' },
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 067fa9f2..9eb9d0a6 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -47,7 +47,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([update/0, get_total_memory/0,
+-export([update/0, get_total_memory/0, get_vm_limit/0,
get_check_interval/0, set_check_interval/1,
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
get_memory_limit/0]).
@@ -76,7 +76,7 @@
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
--spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
+-spec(get_memory_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
@@ -84,7 +84,6 @@
-endif.
-
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------