summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-01-10 14:58:52 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-01-10 14:58:52 +0000
commitc6ce925d783a49d324fdec0d8398b0a7ce7d959e (patch)
tree3eba32f260a2f01f3f27cdfc86e4f557ca19b184
parentbf6abe68860fe01f6b751dd6c38d30b1a8fcbaae (diff)
downloadrabbitmq-server-c6ce925d783a49d324fdec0d8398b0a7ce7d959e.tar.gz
First draft of mnesia fsync
-rw-r--r--src/mnesia_sync.erl93
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_misc.erl3
3 files changed, 100 insertions, 1 deletions
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
new file mode 100644
index 00000000..c9dc1c1c
--- /dev/null
+++ b/src/mnesia_sync.erl
@@ -0,0 +1,93 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(mnesia_sync).
+
+-behaviour(gen_server2).
+
+-export([sync/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {sync_pid}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+
+sync() ->
+ gen_server2:call(?SERVER, sync).
+
+%%----------------------------------------------------------------------------
+
+sync_proc() ->
+ receive
+ {sync_request, From} -> sync_proc([From]);
+ stop -> ok;
+ Other -> error({unexpected_non_sync, Other})
+ end.
+
+sync_proc(Waiting) ->
+ receive
+ {sync_request, From} -> sync_proc([From | Waiting])
+ after 0 ->
+ disk_log:sync(latest_log),
+ [gen_server2:reply(From, ok) || From <- Waiting],
+ sync_proc()
+ end.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{sync_pid = case rabbit_mnesia:is_disc_node() of
+ true -> proc_lib:spawn_link(fun sync_proc/0);
+ false -> undefined
+ end}}.
+
+handle_call(sync, _From, #state{sync_pid = undefined} = State) ->
+ {reply, ok, State};
+handle_call(sync, From, #state{sync_pid = SyncProcPid} = State) ->
+ SyncProcPid ! {sync_request, From},
+ {noreply, State};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, #state{sync_pid = undefined}) ->
+ ok;
+terminate(_Reason, #state{sync_pid = SyncProcPid}) ->
+ SyncProcPid ! stop,
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0a2681a2..2684d2c3 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -45,6 +45,11 @@
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({database_sync,
+ [{mfa, {rabbit_sup, start_child, [mnesia_sync]}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0578cf7d..a508794c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -419,7 +419,8 @@ execute_mnesia_transaction(TxFun) ->
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> Result;
+ {atomic, Result} -> mnesia_sync:sync(),
+ Result;
{aborted, Reason} -> throw({error, Reason})
end.