diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 13:04:06 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 13:04:06 +0000 |
commit | 5d3b291386560f1d9629be9293f2d624779ef322 (patch) | |
tree | 907faaa5ee410480a5ef59b900deb768ef936280 | |
parent | 491142304b11035c82436ee14a36ba97005ef454 (diff) | |
download | rabbitmq-server-5d3b291386560f1d9629be9293f2d624779ef322.tar.gz |
Rip out old persister as it no longer works given the arrival of pub acks
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 314 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 496 |
4 files changed, 0 insertions, 814 deletions
@@ -178,9 +178,6 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -force-snapshot: all - echo "rabbit_persister:force_snapshot()." | $(ERL_CALL) - set-memory-alarm: all echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ $(ERL_CALL) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 17d05a99..6c33ef8b 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -6,7 +6,6 @@ {registered, [rabbit_amqqueue_sup, rabbit_log, rabbit_node_monitor, - rabbit_persister, rabbit_router, rabbit_sup, rabbit_tcp_client_sup]}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl deleted file mode 100644 index 5a0532ea..00000000 --- a/src/rabbit_invariable_queue.erl +++ /dev/null @@ -1,314 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_invariable_queue). - --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, - dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, - idle_timeout/1, handle_pre_hibernate/1, status/1]). - --export([start/1, stop/0]). - --behaviour(rabbit_backing_queue). - --include("rabbit.hrl"). - --record(iv_state, { queue, qname, durable, len, pending_ack }). --record(tx, { pending_messages, pending_acks, is_persistent }). - --ifdef(use_specs). - --type(ack() :: rabbit_guid:guid() | 'blank_ack'). --type(state() :: #iv_state { queue :: queue(), - qname :: rabbit_amqqueue:name(), - len :: non_neg_integer(), - pending_ack :: dict() - }). --include("rabbit_backing_queue_spec.hrl"). - --endif. - -start(DurableQueues) -> - ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). - -stop() -> - ok = rabbit_sup:stop_child(rabbit_persister). - -init(QName, IsDurable, Recover) -> - Q = queue:from_list(case IsDurable andalso Recover of - true -> rabbit_persister:queue_content(QName); - false -> [] - end), - #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = queue:len(Q), - pending_ack = dict:new() }. - -terminate(State) -> - State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. - -delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), - {_PLen, State1} = purge(State), - terminate(State1). - -purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> - %% We do not purge messages pending acks. - {AckTags, PA} = - rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, - _MsgProps, _IsDelivered}, Acc) -> - Acc; - ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, - {AckTagsN, PAN}) -> - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} - end, {[], dict:new()}, Q), - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - {Len, State #iv_state { len = 0, queue = queue:new() }}. - -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. - -publish_delivered(false, _Msg, _MsgProps, State) -> - {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, - State = #iv_state { qname = QName, durable = IsDurable, - len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. - -dropwhile(_Pred, State = #iv_state { len = 0 }) -> - State; -dropwhile(Pred, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - case Pred(MsgProps) of - true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, - IsDelivered, State), - dropwhile(Pred, State1); - false -> State - end. - -fetch(_AckRequired, State = #iv_state { len = 0 }) -> - {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). - -fetch_internal(AckRequired, Q1, - Msg = #basic_message { guid = Guid }, - MsgProps, IsDelivered, - State = #iv_state { len = Len, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - Len1 = Len - 1, - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = store_ack(Msg, MsgProps, PA), - {AckTag, PA2} = case AckRequired of - true -> {Guid, PA1}; - false -> ok = persist_acks(QName, IsDurable, none, - [Guid], PA1), - {blank_ack, PA} - end, - {{Msg, IsDelivered, AckTag, Len1}, - State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. - -ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1 }. - -tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, - durable = IsDurable }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), - State. - -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), - State. - -tx_rollback(Txn, State = #iv_state { qname = QName }) -> - #tx { pending_acks = AckTags } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName), - erase_tx(Txn), - {lists:flatten(AckTags), State}. - -tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, - pending_ack = PA, - queue = Q, - len = Len }) -> - #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName), - erase_tx(Txn), - Fun(), - AckTags1 = lists:flatten(AckTags), - PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - {enqueue(Msg, MsgPropsFun(MsgProps), - false, QN), - LenN + 1} - end, {Q, Len}, PubsRev), - {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. - -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, - queue = Q, - len = Len }) -> - %% We don't need to touch the persister here - the persister will - %% already have these messages published and delivered as - %% necessary. The complication is that the persister's seq_id will - %% now be wrong, given the position of these messages in our queue - %% here. However, the persister's seq_id is only used for sorting - %% on startup, and requeue is silent as to where the requeued - %% messages should appear, thus the persister is permitted to sort - %% based on seq_id, even though it'll likely give a different - %% order to the last known state of our queue, prior to shutdown. - {Q1, Len1} = lists:foldl( - fun (Guid, {QN, LenN}) -> - {Msg = #basic_message {}, MsgProps} - = dict:fetch(Guid, PA), - {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), - LenN + 1} - end, {Q, Len}, AckTags), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. - -enqueue(Msg, MsgProps, IsDelivered, Q) -> - queue:in({Msg, MsgProps, IsDelivered}, Q). - -len(#iv_state { len = Len }) -> Len. - -is_empty(State) -> 0 == len(State). - -set_ram_duration_target(_DurationTarget, State) -> State. - -ram_duration(State) -> {0, State}. - -needs_idle_timeout(_State) -> false. - -idle_timeout(State) -> State. - -handle_pre_hibernate(State) -> State. - -status(_State) -> []. - -%%---------------------------------------------------------------------------- - -remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). - -store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> - dict:store(Guid, {Msg, MsgProps}, PA). - -%%---------------------------------------------------------------------------- - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [], - is_persistent = false }; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -mark_tx_persistent(Txn) -> - store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). - -is_tx_persistent(Txn) -> - (lookup_tx(Txn)) #tx.is_persistent. - -do_if_persistent(F, Txn, QName) -> - ok = case is_tx_persistent(Txn) of - false -> ok; - true -> F({Txn, QName}) - end. - -%%---------------------------------------------------------------------------- - -persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }, MsgProps) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - persist_work(Txn, QName, - [{publish, Msg1, MsgProps, - {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> - ok. - -persist_delivery(QName, true, false, #basic_message { is_persistent = true, - guid = Guid }) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]); -persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> - ok. - -persist_acks(QName, true, Txn, AckTags, PA) -> - persist_work(Txn, QName, - [{ack, {QName, Guid}} || Guid <- AckTags, - begin - {Msg, _MsgProps} - = dict:fetch(Guid, PA), - Msg #basic_message.is_persistent - end]); -persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> - ok. - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl deleted file mode 100644 index 11056c8e..00000000 --- a/src/rabbit_persister.erl +++ /dev/null @@ -1,496 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_persister). - --behaviour(gen_server). - --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([transaction/1, extend_transaction/2, dirty_work/1, - commit_transaction/1, rollback_transaction/1, - force_snapshot/0, queue_content/1]). - --include("rabbit.hrl"). - --define(SERVER, ?MODULE). - --define(LOG_BUNDLE_DELAY, 5). --define(COMPLETE_BUNDLE_DELAY, 2). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). - --record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, snapshot}). - -%% two tables for efficient persistency -%% one maps a key to a message -%% the other maps a key to one or more queues. -%% The aim is to reduce the overload of storing a message multiple times -%% when it appears in several queues. --record(psnapshot, {transactions, messages, queues, next_seq_id}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(pkey() :: rabbit_guid:guid()). --type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). - --type(work_item() :: - {publish, - rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | - {deliver, pmsg()} | - {ack, pmsg()}). - --spec(start_link/1 :: ([rabbit_amqqueue:name()]) -> - rabbit_types:ok_pid_or_error()). --spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()]) - -> 'ok'). --spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(rollback_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(force_snapshot/0 :: () -> 'ok'). --spec(queue_content/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(DurableQueues) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). - -transaction(MessageList) -> - ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). - -extend_transaction(TxnKey, MessageList) -> - ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), - gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). - -dirty_work(MessageList) -> - ?LOGDEBUG("dirty_work ~p~n", [MessageList]), - gen_server:cast(?SERVER, {dirty_work, MessageList}). - -commit_transaction(TxnKey) -> - ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). - -rollback_transaction(TxnKey) -> - ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), - gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). - -force_snapshot() -> - gen_server:call(?SERVER, force_snapshot, infinity). - -queue_content(QName) -> - gen_server:call(?SERVER, {queue_content, QName}, infinity). - -%%-------------------------------------------------------------------- - -init([DurableQueues]) -> - process_flag(trap_exit, true), - FileName = base_filename(), - ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{transactions = dict:new(), - messages = ets:new(messages, []), - queues = ets:new(queues, [ordered_set]), - next_seq_id = 0}, - LogHandle = - case disk_log:open([{name, rabbit_persister}, - {head, current_snapshot(Snapshot)}, - {file, FileName}]) of - {ok, LH} -> LH; - {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> - WarningFun = if - Bad > 0 -> fun rabbit_log:warning/2; - true -> fun rabbit_log:info/2 - end, - WarningFun("Repaired persister log - ~p recovered, ~p bad~n", - [Recovered, Bad]), - LH - end, - {Res, NewSnapshot} = - internal_load_snapshot(LogHandle, DurableQueues, Snapshot), - case Res of - ok -> - ok = take_snapshot(LogHandle, NewSnapshot); - {error, Reason} -> - rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), - ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) - end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, - {ok, State}. - -handle_call({transaction, Key, MessageList}, From, State) -> - NewState = internal_extend(Key, MessageList, State), - do_noreply(internal_commit(From, Key, NewState)); -handle_call({commit_transaction, TxnKey}, From, State) -> - do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> - do_reply(ok, flush(true, State)); -handle_call({queue_content, QName}, _From, - State = #pstate{snapshot = #psnapshot{messages = Messages, - queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], - [{{'$4', '$1', '$2', '$3'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || - {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], - State); -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({rollback_transaction, TxnKey}, State) -> - do_noreply(internal_rollback(TxnKey, State)); -handle_cast({dirty_work, MessageList}, State) -> - do_noreply(internal_dirty_work(MessageList, State)); -handle_cast({extend_transaction, TxnKey, MessageList}, State) -> - do_noreply(internal_extend(TxnKey, MessageList, State)); -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(timeout, State = #pstate{deadline = infinity}) -> - State1 = flush(true, State), - {noreply, State1, hibernate}; -handle_info(timeout, State) -> - do_noreply(flush(State)); -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> - flush(State), - disk_log:close(LogHandle), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, flush(State)}. - -%%-------------------------------------------------------------------- - -internal_extend(Key, MessageList, State) -> - log_work(fun (ML) -> {extend_transaction, Key, ML} end, - MessageList, State). - -internal_dirty_work(MessageList, State) -> - log_work(fun (ML) -> {dirty_work, ML} end, - MessageList, State). - -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {commit_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - complete(From, Unit, State#pstate{snapshot = NewSnapshot}). - -internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {rollback_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -complete(From, Item, State = #pstate{deadline = ExistingDeadline, - pending_logs = Logs, - pending_replies = Waiting}) -> - State#pstate{deadline = compute_deadline( - ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), - pending_logs = [Item | Logs], - pending_replies = [From | Waiting]}. - -%% This is made to limit disk usage by writing messages only once onto -%% disk. We keep a table associating pkeys to messages, and provided -%% the list of messages to output is left to right, we can guarantee -%% that pkeys will be a backreference to a message in memory when a -%% "tied" is met. -log_work(CreateWorkUnit, MessageList, - State = #pstate{ - snapshot = Snapshot = #psnapshot{messages = Messages}}) -> - Unit = CreateWorkUnit( - rabbit_misc:map_in_order( - fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> - case ets:lookup(Messages, PKey) of - [_] -> {tied, MsgProps, QK}; - [] -> ets:insert(Messages, {PKey, Message}), - M - end; - (M) -> M - end, - MessageList)), - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, - Message) -> - State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, - ExistingDeadline), - pending_logs = [Message | Logs]}. - -base_filename() -> - rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". - -take_snapshot(LogHandle, OldFileName, Snapshot) -> - ok = disk_log:sync(LogHandle), - %% current_snapshot is the Head (ie. first thing logged) - ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). - -take_snapshot(LogHandle, Snapshot) -> - OldFileName = lists:flatten(base_filename() ++ ".previous"), - file:delete(OldFileName), - rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -take_snapshot_and_save_old(LogHandle, Snapshot) -> - {MegaSecs, Secs, MicroSecs} = erlang:now(), - Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, - OldFileName = lists:flatten(io_lib:format("~s.saved.~p", - [base_filename(), Timestamp])), - rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, - log_handle = LH, - snapshot = Snapshot}) -> - {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries), - if - Force orelse EntryCount >= MaxWrapEntries -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; - true -> - State - end. - -later_ms(DeltaMilliSec) -> - {MegaSec, Sec, MicroSec} = now(), - %% Note: not normalised. Unimportant for this application. - {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. - -%% Result = B - A, more or less -time_diff({B1, B2, B3}, {A1, A2, A3}) -> - (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . - -compute_deadline(TimerDelay, infinity) -> - later_ms(TimerDelay); -compute_deadline(_TimerDelay, ExistingDeadline) -> - ExistingDeadline. - -compute_timeout(infinity) -> - {ok, HibernateAfter} = application:get_env(persister_hibernate_after), - HibernateAfter; -compute_timeout(Deadline) -> - DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, - if - DeltaMilliSec =< 1 -> - 0; - true -> - round(DeltaMilliSec) - end. - -do_noreply(State = #pstate{deadline = Deadline}) -> - {noreply, State, compute_timeout(Deadline)}. - -do_reply(Reply, State = #pstate{deadline = Deadline}) -> - {reply, Reply, State, compute_timeout(Deadline)}. - -flush(State) -> flush(false, State). - -flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if PendingLogs /= [] -> - disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - State#pstate{entry_count = State#pstate.entry_count + 1}; - true -> - State - end, - State2 = maybe_take_snapshot(ForceSnapshot, State1), - if Waiting /= [] -> - ok = disk_log:sync(LogHandle), - lists:foreach(fun (From) -> gen_server:reply(From, ok) end, - Waiting); - true -> - ok - end, - State2#pstate{deadline = infinity, - pending_logs = [], - pending_replies = []}. - -current_snapshot(_Snapshot = #psnapshot{transactions = Ts, - messages = Messages, - queues = Queues, - next_seq_id = NextSeqId}) -> - %% Avoid infinite growth of the table by removing messages not - %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, - _MsgProps, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues), - prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), - InnerSnapshot = {{txns, Ts}, - {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}, - {next_seq_id, NextSeqId}}, - ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), - {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - term_to_binary(InnerSnapshot)}. - -prune_table(Tab, Pred) -> - true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Pred, ets:first(Tab)), - true = ets:safe_fixtable(Tab, false). - -prune_table(_Tab, _Pred, '$end_of_table') -> ok; -prune_table(Tab, Pred, Key) -> - case Pred(Key) of - true -> ok; - false -> ets:delete(Tab, Key) - end, - prune_table(Tab, Pred, ets:next(Tab, Key)). - -internal_load_snapshot(LogHandle, - DurableQueues, - Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), - case check_version(Loaded_Snapshot) of - {ok, StateBin} -> - {{txns, Ts}, {messages, Ms}, {queues, Qs}, - {next_seq_id, NextSeqId}} = binary_to_term(StateBin), - true = ets:insert(Messages, Ms), - true = ets:insert(Queues, Qs), - Snapshot1 = replay(Items, LogHandle, K, - Snapshot#psnapshot{ - transactions = Ts, - next_seq_id = NextSeqId}), - %% Remove all entries for queues that no longer exist. - %% Note that the 'messages' table is pruned when the next - %% snapshot is taken. - DurableQueuesSet = sets:from_list(DurableQueues), - prune_table(Snapshot1#psnapshot.queues, - fun ({QName, _PKey}) -> - sets:is_element(QName, DurableQueuesSet) - end), - %% uncompleted transactions are discarded - this is TRTTD - %% since we only get into this code on node restart, so - %% any uncompleted transactions will have been aborted. - {ok, Snapshot1#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, Snapshot} - end. - -check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - StateBin}) -> - {ok, StateBin}; -check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> - {error, {unsupported_persister_log_format, Vsn}}; -check_version(_Other) -> - {error, unrecognised_persister_log_format}. - -replay([], LogHandle, K, Snapshot) -> - case disk_log:chunk(LogHandle, K) of - {K1, Items} -> - replay(Items, LogHandle, K1, Snapshot); - {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", - [Badbytes]), - replay(Items, LogHandle, K1, Snapshot); - eof -> Snapshot - end; -replay([Item | Items], LogHandle, K, Snapshot) -> - NewSnapshot = internal_integrate_messages(Item, Snapshot), - replay(Items, LogHandle, K, NewSnapshot). - -internal_integrate_messages(Items, Snapshot) -> - lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, - Snapshot, Items). - -internal_integrate1({extend_transaction, Key, MessageList}, - Snapshot = #psnapshot {transactions = Transactions}) -> - Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, - Transactions)}; -internal_integrate1({rollback_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions}) -> - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; -internal_integrate1({commit_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - case dict:find(Key, Transactions) of - {ok, MessageLists} -> - ?LOGDEBUG("persist committing txn ~p~n", [Key]), - NextSeqId = - lists:foldr( - fun (ML, SeqIdN) -> - perform_work(ML, Messages, Queues, SeqIdN) end, - SeqId, MessageLists), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), - next_seq_id = NextSeqId}; - error -> - Snapshot - end; -internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot{messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, - Queues, SeqId)}. - -perform_work(MessageList, Messages, Queues, SeqId) -> - lists:foldl(fun (Item, NextSeqId) -> - perform_work_item(Item, Messages, Queues, NextSeqId) - end, SeqId, MessageList). - -perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, - Messages, Queues, NextSeqId) -> - true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> - true = ets:update_element(Queues, QK, {2, true}), - NextSeqId; - -perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> - true = ets:delete(Queues, QK), - NextSeqId. |