diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-01-10 14:58:52 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-01-10 14:58:52 +0000 |
commit | c6ce925d783a49d324fdec0d8398b0a7ce7d959e (patch) | |
tree | 3eba32f260a2f01f3f27cdfc86e4f557ca19b184 | |
parent | bf6abe68860fe01f6b751dd6c38d30b1a8fcbaae (diff) | |
download | rabbitmq-server-c6ce925d783a49d324fdec0d8398b0a7ce7d959e.tar.gz |
First draft of mnesia fsync
-rw-r--r-- | src/mnesia_sync.erl | 93 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 3 |
3 files changed, 100 insertions, 1 deletions
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl new file mode 100644 index 00000000..c9dc1c1c --- /dev/null +++ b/src/mnesia_sync.erl @@ -0,0 +1,93 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(mnesia_sync). + +-behaviour(gen_server2). + +-export([sync/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {sync_pid}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + +sync() -> + gen_server2:call(?SERVER, sync). + +%%---------------------------------------------------------------------------- + +sync_proc() -> + receive + {sync_request, From} -> sync_proc([From]); + stop -> ok; + Other -> error({unexpected_non_sync, Other}) + end. + +sync_proc(Waiting) -> + receive + {sync_request, From} -> sync_proc([From | Waiting]) + after 0 -> + disk_log:sync(latest_log), + [gen_server2:reply(From, ok) || From <- Waiting], + sync_proc() + end. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{sync_pid = case rabbit_mnesia:is_disc_node() of + true -> proc_lib:spawn_link(fun sync_proc/0); + false -> undefined + end}}. + +handle_call(sync, _From, #state{sync_pid = undefined} = State) -> + {reply, ok, State}; +handle_call(sync, From, #state{sync_pid = SyncProcPid} = State) -> + SyncProcPid ! {sync_request, From}, + {noreply, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, #state{sync_pid = undefined}) -> + ok; +terminate(_Reason, #state{sync_pid = SyncProcPid}) -> + SyncProcPid ! stop, + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a2681a2..2684d2c3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -45,6 +45,11 @@ {requires, file_handle_cache}, {enables, external_infrastructure}]}). +-rabbit_boot_step({database_sync, + [{mfa, {rabbit_sup, start_child, [mnesia_sync]}}, + {requires, database}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0578cf7d..a508794c 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -419,7 +419,8 @@ execute_mnesia_transaction(TxFun) -> %% elsewhere and get a consistent result even when that read %% executes on a different node. case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of - {atomic, Result} -> Result; + {atomic, Result} -> mnesia_sync:sync(), + Result; {aborted, Reason} -> throw({error, Reason}) end. |