diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-10-12 15:15:45 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-10-12 15:15:45 +0100 |
commit | b1c0c1e13e2cafe55722434c32e9d4876058db0b (patch) | |
tree | c3004e1fbda2e1a396fba8c2e62a8a540968c1dd | |
parent | 28e86291ffa54c034d5e973c417fd2d8738d6c42 (diff) | |
parent | f528b2da3af823b457a11e86b901eb6b98fd6f9e (diff) | |
download | rabbitmq-server-b1c0c1e13e2cafe55722434c32e9d4876058db0b.tar.gz |
merging in from bug 21368
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 148 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 95 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 5 |
6 files changed, 137 insertions, 131 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 30cfb99f..3a5cc2b0 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -119,6 +119,9 @@ fi rm -rf %{buildroot} %changelog +* Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1 +- New upstream release + * Wed Jun 17 2009 Matthias Radestock <matthias@lshift.net> 1.6.0-1 - New upstream release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index ac94c8a3..e4cfe7b5 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.7.0-1) intrepid; urgency=low + + * New Upstream Release + + -- David Wragg <dpw@lshift.net> Mon, 05 Oct 2009 13:44:41 +0100 + rabbitmq-server (1.6.0-1) hardy; urgency=low * New Upstream Release diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0836672f..d573f154 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -123,16 +123,23 @@ init(Q = #amqqueue { name = QName, durable = Durable, pinned = Pinned }) -> {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, State) -> +terminate(_Reason, State = #q{mixed_state = MS}) -> %% FIXME: How do we cancel active subscriptions? State1 = stop_memory_timer(State), + %% Ensure that any persisted tx messages are removed; + %% mixed_queue:delete_queue cannot do that for us since neither + %% mixed_queue nor disk_queue keep a record of uncommitted tx + %% messages. + {ok, MS1} = rabbit_mixed_queue:tx_rollback( + lists:concat([PM || #tx { pending_messages = PM } <- + all_tx_record()]), MS), %% Delete from disk queue first. If we crash at this point, when a %% durable queue, we will be recreated at startup, possibly with %% partial content. The alternative is much worse however - if we %% called internal_delete first, we would then have a race between %% the disk_queue delete and a new queue with the same name being %% created and published to. - {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state), + {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1), ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 02a8ed8c..7d44dd9d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -44,8 +44,6 @@ prefetch/1 ]). --export([filesync/0]). - -export([stop/0, stop_and_obliterate/0]). %%---------------------------------------------------------------------------- @@ -63,17 +61,12 @@ is_persistent = true }). --define(SYNC_INTERVAL, 5). %% milliseconds -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(SERVER, ?MODULE). --record(dqstate, - {sequences, %% next read and write for each q - on_sync_txns, %% list of commiters to run on sync (reversed) - commit_timer_ref %% TRef for our interval timer - }). +-record(dqstate, { sequences }). %% next read and write for each q %%---------------------------------------------------------------------------- @@ -109,7 +102,6 @@ A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). --spec(filesync/0 :: () -> 'ok'). -endif. @@ -173,8 +165,10 @@ stop() -> stop_and_obliterate() -> gen_server2:call(?SERVER, stop_vaporise, infinity). -filesync() -> - gen_server2:pcall(?SERVER, 9, filesync). +%% private + +finalise_commit(TxDetails) -> + gen_server2:cast(?SERVER, {finalise_commit, TxDetails}). %%---------------------------------------------------------------------------- %% gen_server behaviour @@ -203,9 +197,7 @@ init([]) -> Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), - State = #dqstate { sequences = Sequences, - on_sync_txns = [], - commit_timer_ref = undefined }, + State = #dqstate { sequences = Sequences }, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -222,8 +214,6 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); -handle_call(filesync, _From, State) -> - reply(ok, sync(State)); handle_call({delete_queue, Q}, From, State) -> gen_server2:reply(From, ok), {ok, State1} = internal_delete_queue(Q, State), @@ -275,13 +265,12 @@ handle_cast({prefetch, Q, From}, State) -> internal_fetch_attributes(Q, ignore_delivery, State1); false -> ok end, - noreply(State1). + noreply(State1); +handle_cast({finalise_commit, TxDetails}, State) -> + noreply(finalise_commit(TxDetails, State)). handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; -handle_info(timeout, State) -> - %% must have commit_timer set, so timeout was 0, and we're not hibernating - noreply(sync(State)). + {stop, Reason, State}. terminate(_Reason, State) -> State1 = shutdown(State), @@ -291,10 +280,9 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { sequences = undefined }) -> State; shutdown(State = #dqstate { sequences = Sequences }) -> - State1 = stop_commit_timer(State), ok = rabbit_msg_store:stop(), ets:delete(Sequences), - State1 #dqstate { sequences = undefined }. + State #dqstate { sequences = undefined }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -304,28 +292,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- noreply(State) -> - noreply1(State). - -noreply1(State) -> - {State1, Timeout} = next_state(State), - {noreply, State1, Timeout}. + {noreply, State, hibernate}. reply(Reply, State) -> - reply1(Reply, State). - -reply1(Reply, State) -> - {State1, Timeout} = next_state(State), - {reply, Reply, State1, Timeout}. - -next_state(State = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #dqstate { commit_timer_ref = undefined }) -> - {start_commit_timer(State), 0}; -next_state(State = #dqstate { on_sync_txns = [] }) -> - {stop_commit_timer(State), hibernate}; -next_state(State) -> - {State, 0}. + {reply, Reply, State, hibernate}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -339,25 +309,6 @@ sequence_lookup(Sequences, Q) -> [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId} end. -start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), - State #dqstate { commit_timer_ref = TRef }. - -stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - State; -stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #dqstate { commit_timer_ref = undefined }. - -sync(State = #dqstate { on_sync_txns = Txns }) -> - ok = rabbit_msg_store:sync(), - case Txns of - [] -> State; - _ -> lists:foldl(fun internal_do_tx_commit/2, - State #dqstate { on_sync_txns = [] }, - lists:reverse(Txns)) - end. - %%---------------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------------- @@ -404,10 +355,9 @@ maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), ok. -internal_foldl(Q, Fun, Init, State) -> - State1 = #dqstate { sequences = Sequences } = sync(State), +internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). + internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; @@ -437,41 +387,37 @@ internal_tx_publish(Message = #basic_message { guid = MsgId, MsgId, Message #basic_message { content = ClearedContent }), {ok, State}. -internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { on_sync_txns = Txns }) -> - TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, - case rabbit_msg_store:needs_sync( - [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of - true -> Txns1 = [TxnDetails | Txns], - State #dqstate { on_sync_txns = Txns1 }; - false -> internal_do_tx_commit(TxnDetails, State) - end. +internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) -> + TxDetails = {Q, PubMsgIds, AckSeqIds, From}, + ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds], + fun () -> finalise_commit(TxDetails) end), + State. -internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, - State = #dqstate { sequences = Sequences }) -> +finalise_commit({Q, PubMsgIds, AckSeqIds, From}, + State = #dqstate { sequences = Sequences }) -> {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), WriteSeqId = rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - {ok, WriteSeqId1} = - lists:foldl( - fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) -> - {mnesia:write( + lists:foldl( + fun ({MsgId, IsDelivered, IsPersistent}, SeqId) -> + ok = mnesia:write( rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, - msg_id = MsgId, - is_delivered = IsDelivered, - is_persistent = IsPersistent - }, write), - SeqId + 1} - end, {ok, InitWriteSeqId}, PubMsgIds), - WriteSeqId1 + #dq_msg_loc { + queue_and_seq_id = {Q, SeqId}, + msg_id = MsgId, + is_delivered = IsDelivered, + is_persistent = IsPersistent + }, write), + SeqId + 1 + end, InitWriteSeqId, PubMsgIds) end), {ok, State1} = remove_messages(Q, AckSeqIds, State), true = case PubMsgIds of [] -> true; - _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) + _ -> ets:insert(Sequences, + {Q, InitReadSeqId, WriteSeqId}) end, gen_server2:reply(From, ok), State1. @@ -589,19 +535,19 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end. internal_delete_queue(Q, State) -> - State1 = sync(State), - {ok, _Count, State2 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State1), %% remove everything undelivered + %% remove everything undelivered + {ok, _Count, State1 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State), true = ets:delete(Sequences, Q), - %% now remove everything already delivered - Objs = mnesia:dirty_match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }), - MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId }) -> - {MsgId, SeqId} - end, Objs), - remove_messages(Q, MsgSeqIds, State2). + %% remove everything already delivered + remove_messages( + Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId } <- + mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { + queue_and_seq_id = {Q, '_'}, + _ = '_' })], State1). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index aa779e61..8596e09f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,9 @@ -behaviour(gen_server2). -export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, - needs_sync/1, sync/0, stop/0]). + sync/2, stop/0]). + +-export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,6 +45,7 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(SYNC_INTERVAL, 5). %% milliseconds %%---------------------------------------------------------------------------- @@ -61,8 +64,7 @@ -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). --spec(needs_sync/1 :: ([msg_id()]) -> boolean()). --spec(sync/0 :: () -> 'ok'). +-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -endif. @@ -81,6 +83,8 @@ file_size_limit, %% how big can our files get? read_file_handle_cache, %% file handle cache for reading last_sync_offset, %% current_offset at the last time we sync'd + on_sync, %% pending sync requests + sync_timer_ref, %% TRef for our interval timer message_cache %% ets message cache }). @@ -232,9 +236,9 @@ read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity). -sync() -> gen_server2:call(?SERVER, sync, infinity). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). stop() -> gen_server2:call(?SERVER, stop, infinity). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -262,6 +266,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> file_size_limit = ?FILE_SIZE_LIMIT, read_file_handle_cache = HandleCache, last_sync_offset = 0, + on_sync = [], + sync_timer_ref = undefined, message_cache = MessageCache }, @@ -330,21 +336,6 @@ handle_call({contains, MsgId}, _From, State) -> #msg_location {} -> true end, State); -handle_call({needs_sync, _MsgIds}, _From, - State = #msstate { current_dirty = false }) -> - reply(false, State); -handle_call({needs_sync, MsgIds}, _From, - State = #msstate { current_file = CurFile, - last_sync_offset = SyncOffset }) -> - reply(lists:any(fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds), State); - -handle_call(sync, _From, State) -> - reply(ok, sync(State)); - handle_call(stop, _From, State) -> {stop, normal, ok, State}. @@ -403,10 +394,32 @@ handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), - noreply(State). + noreply(State); -handle_info(_Info, State) -> - noreply(State). +handle_cast({sync, _MsgIds, K}, + State = #msstate { current_dirty = false }) -> + K(), + noreply(State); + +handle_cast({sync, MsgIds, K}, + State = #msstate { current_file = CurFile, + last_sync_offset = SyncOffset, + on_sync = Syncs }) -> + case lists:any(fun (MsgId) -> + #msg_location { file = File, offset = Offset } = + index_lookup(MsgId, State), + File =:= CurFile andalso Offset >= SyncOffset + end, MsgIds) of + false -> K(), + noreply(State); + true -> noreply(State #msstate { on_sync = [K | Syncs] }) + end; + +handle_cast(sync, State) -> + noreply(sync(State)). + +handle_info(timeout, State) -> + noreply(sync(State)). terminate(_Reason, State = #msstate { msg_locations = MsgLocations, file_summary = FileSummary, @@ -434,9 +447,32 @@ code_change(_OldVsn, State, _Extra) -> %% general helper functions %%---------------------------------------------------------------------------- -noreply(State) -> {noreply, State}. +noreply(State) -> + {State1, Timeout} = next_state(State), + {noreply, State1, Timeout}. + +reply(Reply, State) -> + {State1, Timeout} = next_state(State), + {reply, Reply, State1, Timeout}. -reply(Reply, State) -> {reply, Reply, State}. +next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> + {State, infinity}; +next_state(State = #msstate { sync_timer_ref = undefined }) -> + {start_sync_timer(State), 0}; +next_state(State = #msstate { on_sync = [] }) -> + {stop_sync_timer(State), infinity}; +next_state(State) -> + {State, 0}. + +start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, []), + State #msstate { sync_timer_ref = TRef }. + +stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #msstate { sync_timer_ref = undefined }. form_filename(Dir, Name) -> filename:join(Dir, Name). @@ -465,9 +501,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> sync(State = #msstate { current_dirty = false }) -> State; sync(State = #msstate { current_file_handle = CurHdl, - current_offset = CurOffset }) -> + current_offset = CurOffset, + on_sync = Syncs }) -> + State1 = stop_sync_timer(State), ok = file:sync(CurHdl), - State #msstate { current_dirty = false, last_sync_offset = CurOffset }. + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { current_dirty = false, + last_sync_offset = CurOffset, + on_sync = [] }. with_read_handle_at(File, Offset, Fun, State = #msstate { dir = Dir, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9d9e60ba..259f120a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -668,7 +668,10 @@ test_server_status() -> {ok, _} = rabbit_amqqueue:delete(Q, false, false), %% list connections - [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(), + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action( |