summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-12 15:15:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-12 15:15:45 +0100
commitb1c0c1e13e2cafe55722434c32e9d4876058db0b (patch)
treec3004e1fbda2e1a396fba8c2e62a8a540968c1dd
parent28e86291ffa54c034d5e973c417fd2d8738d6c42 (diff)
parentf528b2da3af823b457a11e86b901eb6b98fd6f9e (diff)
downloadrabbitmq-server-b1c0c1e13e2cafe55722434c32e9d4876058db0b.tar.gz
merging in from bug 21368
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_disk_queue.erl148
-rw-r--r--src/rabbit_msg_store.erl95
-rw-r--r--src/rabbit_tests.erl5
6 files changed, 137 insertions, 131 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 30cfb99f..3a5cc2b0 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -119,6 +119,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1
+- New upstream release
+
* Wed Jun 17 2009 Matthias Radestock <matthias@lshift.net> 1.6.0-1
- New upstream release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index ac94c8a3..e4cfe7b5 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.7.0-1) intrepid; urgency=low
+
+ * New Upstream Release
+
+ -- David Wragg <dpw@lshift.net> Mon, 05 Oct 2009 13:44:41 +0100
+
rabbitmq-server (1.6.0-1) hardy; urgency=low
* New Upstream Release
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0836672f..d573f154 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -123,16 +123,23 @@ init(Q = #amqqueue { name = QName, durable = Durable, pinned = Pinned }) ->
{ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, State) ->
+terminate(_Reason, State = #q{mixed_state = MS}) ->
%% FIXME: How do we cancel active subscriptions?
State1 = stop_memory_timer(State),
+ %% Ensure that any persisted tx messages are removed;
+ %% mixed_queue:delete_queue cannot do that for us since neither
+ %% mixed_queue nor disk_queue keep a record of uncommitted tx
+ %% messages.
+ {ok, MS1} = rabbit_mixed_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), MS),
%% Delete from disk queue first. If we crash at this point, when a
%% durable queue, we will be recreated at startup, possibly with
%% partial content. The alternative is much worse however - if we
%% called internal_delete first, we would then have a race between
%% the disk_queue delete and a new queue with the same name being
%% created and published to.
- {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state),
+ {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1),
ok = rabbit_amqqueue:internal_delete(qname(State1)).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 02a8ed8c..7d44dd9d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -44,8 +44,6 @@
prefetch/1
]).
--export([filesync/0]).
-
-export([stop/0, stop_and_obliterate/0]).
%%----------------------------------------------------------------------------
@@ -63,17 +61,12 @@
is_persistent = true
}).
--define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(SERVER, ?MODULE).
--record(dqstate,
- {sequences, %% next read and write for each q
- on_sync_txns, %% list of commiters to run on sync (reversed)
- commit_timer_ref %% TRef for our interval timer
- }).
+-record(dqstate, { sequences }). %% next read and write for each q
%%----------------------------------------------------------------------------
@@ -109,7 +102,6 @@
A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
--spec(filesync/0 :: () -> 'ok').
-endif.
@@ -173,8 +165,10 @@ stop() ->
stop_and_obliterate() ->
gen_server2:call(?SERVER, stop_vaporise, infinity).
-filesync() ->
- gen_server2:pcall(?SERVER, 9, filesync).
+%% private
+
+finalise_commit(TxDetails) ->
+ gen_server2:cast(?SERVER, {finalise_commit, TxDetails}).
%%----------------------------------------------------------------------------
%% gen_server behaviour
@@ -203,9 +197,7 @@ init([]) ->
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
- State = #dqstate { sequences = Sequences,
- on_sync_txns = [],
- commit_timer_ref = undefined },
+ State = #dqstate { sequences = Sequences },
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -222,8 +214,6 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
-handle_call(filesync, _From, State) ->
- reply(ok, sync(State));
handle_call({delete_queue, Q}, From, State) ->
gen_server2:reply(From, ok),
{ok, State1} = internal_delete_queue(Q, State),
@@ -275,13 +265,12 @@ handle_cast({prefetch, Q, From}, State) ->
internal_fetch_attributes(Q, ignore_delivery, State1);
false -> ok
end,
- noreply(State1).
+ noreply(State1);
+handle_cast({finalise_commit, TxDetails}, State) ->
+ noreply(finalise_commit(TxDetails, State)).
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
-handle_info(timeout, State) ->
- %% must have commit_timer set, so timeout was 0, and we're not hibernating
- noreply(sync(State)).
+ {stop, Reason, State}.
terminate(_Reason, State) ->
State1 = shutdown(State),
@@ -291,10 +280,9 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { sequences = undefined }) ->
State;
shutdown(State = #dqstate { sequences = Sequences }) ->
- State1 = stop_commit_timer(State),
ok = rabbit_msg_store:stop(),
ets:delete(Sequences),
- State1 #dqstate { sequences = undefined }.
+ State #dqstate { sequences = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -304,28 +292,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
noreply(State) ->
- noreply1(State).
-
-noreply1(State) ->
- {State1, Timeout} = next_state(State),
- {noreply, State1, Timeout}.
+ {noreply, State, hibernate}.
reply(Reply, State) ->
- reply1(Reply, State).
-
-reply1(Reply, State) ->
- {State1, Timeout} = next_state(State),
- {reply, Reply, State1, Timeout}.
-
-next_state(State = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #dqstate { commit_timer_ref = undefined }) ->
- {start_commit_timer(State), 0};
-next_state(State = #dqstate { on_sync_txns = [] }) ->
- {stop_commit_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+ {reply, Reply, State, hibernate}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -339,25 +309,6 @@ sequence_lookup(Sequences, Q) ->
[{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId}
end.
-start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
- State #dqstate { commit_timer_ref = TRef }.
-
-stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- State;
-stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #dqstate { commit_timer_ref = undefined }.
-
-sync(State = #dqstate { on_sync_txns = Txns }) ->
- ok = rabbit_msg_store:sync(),
- case Txns of
- [] -> State;
- _ -> lists:foldl(fun internal_do_tx_commit/2,
- State #dqstate { on_sync_txns = [] },
- lists:reverse(Txns))
- end.
-
%%----------------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------------
@@ -404,10 +355,9 @@ maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
ok.
-internal_foldl(Q, Fun, Init, State) ->
- State1 = #dqstate { sequences = Sequences } = sync(State),
+internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
+ internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
@@ -437,41 +387,37 @@ internal_tx_publish(Message = #basic_message { guid = MsgId,
MsgId, Message #basic_message { content = ClearedContent }),
{ok, State}.
-internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { on_sync_txns = Txns }) ->
- TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
- case rabbit_msg_store:needs_sync(
- [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of
- true -> Txns1 = [TxnDetails | Txns],
- State #dqstate { on_sync_txns = Txns1 };
- false -> internal_do_tx_commit(TxnDetails, State)
- end.
+internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) ->
+ TxDetails = {Q, PubMsgIds, AckSeqIds, From},
+ ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds],
+ fun () -> finalise_commit(TxDetails) end),
+ State.
-internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
- State = #dqstate { sequences = Sequences }) ->
+finalise_commit({Q, PubMsgIds, AckSeqIds, From},
+ State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
WriteSeqId =
rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- {ok, WriteSeqId1} =
- lists:foldl(
- fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) ->
- {mnesia:write(
+ lists:foldl(
+ fun ({MsgId, IsDelivered, IsPersistent}, SeqId) ->
+ ok = mnesia:write(
rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
- msg_id = MsgId,
- is_delivered = IsDelivered,
- is_persistent = IsPersistent
- }, write),
- SeqId + 1}
- end, {ok, InitWriteSeqId}, PubMsgIds),
- WriteSeqId1
+ #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = IsDelivered,
+ is_persistent = IsPersistent
+ }, write),
+ SeqId + 1
+ end, InitWriteSeqId, PubMsgIds)
end),
{ok, State1} = remove_messages(Q, AckSeqIds, State),
true = case PubMsgIds of
[] -> true;
- _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
+ _ -> ets:insert(Sequences,
+ {Q, InitReadSeqId, WriteSeqId})
end,
gen_server2:reply(From, ok),
State1.
@@ -589,19 +535,19 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end.
internal_delete_queue(Q, State) ->
- State1 = sync(State),
- {ok, _Count, State2 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State1), %% remove everything undelivered
+ %% remove everything undelivered
+ {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State),
true = ets:delete(Sequences, Q),
- %% now remove everything already delivered
- Objs = mnesia:dirty_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }),
- MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId}
- end, Objs),
- remove_messages(Q, MsgSeqIds, State2).
+ %% remove everything already delivered
+ remove_messages(
+ Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId } <-
+ mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc {
+ queue_and_seq_id = {Q, '_'},
+ _ = '_' })], State1).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index aa779e61..8596e09f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,9 @@
-behaviour(gen_server2).
-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
- needs_sync/1, sync/0, stop/0]).
+ sync/2, stop/0]).
+
+-export([sync/0]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -43,6 +45,7 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(SYNC_INTERVAL, 5). %% milliseconds
%%----------------------------------------------------------------------------
@@ -61,8 +64,7 @@
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
--spec(needs_sync/1 :: ([msg_id()]) -> boolean()).
--spec(sync/0 :: () -> 'ok').
+-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-endif.
@@ -81,6 +83,8 @@
file_size_limit, %% how big can our files get?
read_file_handle_cache, %% file handle cache for reading
last_sync_offset, %% current_offset at the last time we sync'd
+ on_sync, %% pending sync requests
+ sync_timer_ref, %% TRef for our interval timer
message_cache %% ets message cache
}).
@@ -232,9 +236,9 @@ read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity).
-sync() -> gen_server2:call(?SERVER, sync, infinity).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
stop() -> gen_server2:call(?SERVER, stop, infinity).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -262,6 +266,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
file_size_limit = ?FILE_SIZE_LIMIT,
read_file_handle_cache = HandleCache,
last_sync_offset = 0,
+ on_sync = [],
+ sync_timer_ref = undefined,
message_cache = MessageCache
},
@@ -330,21 +336,6 @@ handle_call({contains, MsgId}, _From, State) ->
#msg_location {} -> true
end, State);
-handle_call({needs_sync, _MsgIds}, _From,
- State = #msstate { current_dirty = false }) ->
- reply(false, State);
-handle_call({needs_sync, MsgIds}, _From,
- State = #msstate { current_file = CurFile,
- last_sync_offset = SyncOffset }) ->
- reply(lists:any(fun (MsgId) ->
- #msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
- File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds), State);
-
-handle_call(sync, _From, State) ->
- reply(ok, sync(State));
-
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
@@ -403,10 +394,32 @@ handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
- noreply(State).
+ noreply(State);
-handle_info(_Info, State) ->
- noreply(State).
+handle_cast({sync, _MsgIds, K},
+ State = #msstate { current_dirty = false }) ->
+ K(),
+ noreply(State);
+
+handle_cast({sync, MsgIds, K},
+ State = #msstate { current_file = CurFile,
+ last_sync_offset = SyncOffset,
+ on_sync = Syncs }) ->
+ case lists:any(fun (MsgId) ->
+ #msg_location { file = File, offset = Offset } =
+ index_lookup(MsgId, State),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, MsgIds) of
+ false -> K(),
+ noreply(State);
+ true -> noreply(State #msstate { on_sync = [K | Syncs] })
+ end;
+
+handle_cast(sync, State) ->
+ noreply(sync(State)).
+
+handle_info(timeout, State) ->
+ noreply(sync(State)).
terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
file_summary = FileSummary,
@@ -434,9 +447,32 @@ code_change(_OldVsn, State, _Extra) ->
%% general helper functions
%%----------------------------------------------------------------------------
-noreply(State) -> {noreply, State}.
+noreply(State) ->
+ {State1, Timeout} = next_state(State),
+ {noreply, State1, Timeout}.
+
+reply(Reply, State) ->
+ {State1, Timeout} = next_state(State),
+ {reply, Reply, State1, Timeout}.
-reply(Reply, State) -> {reply, Reply, State}.
+next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
+ {State, infinity};
+next_state(State = #msstate { sync_timer_ref = undefined }) ->
+ {start_sync_timer(State), 0};
+next_state(State = #msstate { on_sync = [] }) ->
+ {stop_sync_timer(State), infinity};
+next_state(State) ->
+ {State, 0}.
+
+start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, []),
+ State #msstate { sync_timer_ref = TRef }.
+
+stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #msstate { sync_timer_ref = undefined }.
form_filename(Dir, Name) -> filename:join(Dir, Name).
@@ -465,9 +501,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
sync(State = #msstate { current_dirty = false }) ->
State;
sync(State = #msstate { current_file_handle = CurHdl,
- current_offset = CurOffset }) ->
+ current_offset = CurOffset,
+ on_sync = Syncs }) ->
+ State1 = stop_sync_timer(State),
ok = file:sync(CurHdl),
- State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ State1 #msstate { current_dirty = false,
+ last_sync_offset = CurOffset,
+ on_sync = [] }.
with_read_handle_at(File, Offset, Fun,
State = #msstate { dir = Dir,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9d9e60ba..259f120a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -668,7 +668,10 @@ test_server_status() ->
{ok, _} = rabbit_amqqueue:delete(Q, false, false),
%% list connections
- [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(),
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+
{ok, C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
ok = info_action(