diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-22 14:49:18 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-22 14:49:18 +0100 |
commit | a7f6713daf4bf5bce503d80583949fc47cb2914a (patch) | |
tree | e34c53dc90eaadc636f2fee0834aa1d8f86aee93 | |
parent | 46dc9ff82442dd4c73a00eb2ad21e91fd1fc9c09 (diff) | |
parent | fcf7e73bae8f96cd5ffff1c06ac5453476831ab7 (diff) | |
download | rabbitmq-server-a7f6713daf4bf5bce503d80583949fc47cb2914a.tar.gz |
Merging bug 22615 onto default
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 4 | ||||
-rwxr-xr-x | packaging/common/rabbitmq-server.ocf | 362 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rules | 1 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 128 |
4 files changed, 437 insertions, 58 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c318a96c..3ed907c8 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -10,6 +10,7 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-asroot-script-wrapper +Source5: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch BuildRequires: erlang, python-simplejson @@ -29,6 +30,7 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` +%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -38,6 +40,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} cp %{S:4} %{_rabbit_asroot_wrapper} +cp %{S:5} %{_rabbit_server_ocf} make %{?_smp_mflags} %install @@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins +install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf new file mode 100755 index 00000000..97c58ea2 --- /dev/null +++ b/packaging/common/rabbitmq-server.ocf @@ -0,0 +1,362 @@ +#!/bin/sh +## +## OCF Resource Agent compliant rabbitmq-server resource script. +## + +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2010 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2010 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +## OCF instance parameters +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args + +####################################################################### +# Initialization: + +. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs + +####################################################################### + +OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi" +OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" +OCF_RESKEY_nodename_default="rabbit@localhost" +OCF_RESKEY_log_base_default="/var/log/rabbitmq" +: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}} +: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} +: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} +: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} + +meta_data() { + cat <<END +<?xml version="1.0"?> +<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> +<resource-agent name="rabbitmq-server"> +<version>1.0</version> + +<longdesc lang="en"> +Resource agent for RabbitMQ-server +</longdesc> + +<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc> + +<parameters> +<parameter name="multi" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmq-multi script +</longdesc> +<shortdesc lang="en">Path to rabbitmq-multi</shortdesc> +<content type="string" default="${OCF_RESKEY_multi_default}" /> +</parameter> + +<parameter name="ctl" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmqctl script +</longdesc> +<shortdesc lang="en">Path to rabbitmqctl</shortdesc> +<content type="string" default="${OCF_RESKEY_ctl_default}" /> +</parameter> + +<parameter name="nodename" unique="0" required="0"> +<longdesc lang="en"> +The node name for rabbitmq-server +</longdesc> +<shortdesc lang="en">Node name</shortdesc> +<content type="string" default="${OCF_RESKEY_nodename_default}" /> +</parameter> + +<parameter name="ip" unique="0" required="0"> +<longdesc lang="en"> +The IP address for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Address</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="port" unique="0" required="0"> +<longdesc lang="en"> +The IP Port for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Port</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="cluster_config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the cluster config file +</longdesc> +<shortdesc lang="en">Cluster config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the config file +</longdesc> +<shortdesc lang="en">Config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="log_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which logs will be created +</longdesc> +<shortdesc lang="en">Log base path</shortdesc> +<content type="string" default="${OCF_RESKEY_log_base_default}" /> +</parameter> + +<parameter name="mnesia_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which mnesia will store data +</longdesc> +<shortdesc lang="en">Mnesia base path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="server_start_args" unique="0" required="0"> +<longdesc lang="en"> +Additional arguments provided to the server on startup +</longdesc> +<shortdesc lang="en">Server start arguments</shortdesc> +<content type="string" default="" /> +</parameter> + +</parameters> + +<actions> +<action name="start" timeout="600" /> +<action name="stop" timeout="120" /> +<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="validate-all" timeout="30" /> +<action name="meta-data" timeout="5" /> +</actions> +</resource-agent> +END +} + +rabbit_usage() { + cat <<END +usage: $0 {start|stop|monitor|validate-all|meta-data} + +Expects to have a fully populated OCF RA-compliant environment set. +END +} + +RABBITMQ_MULTI=$OCF_RESKEY_multi +RABBITMQ_CTL=$OCF_RESKEY_ctl +RABBITMQ_NODENAME=$OCF_RESKEY_nodename +RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip +RABBITMQ_NODE_PORT=$OCF_RESKEY_port +RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file +RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file +RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base +RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base +RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" +[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME + +export_vars() { + [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS + [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT + [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE + [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE + [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE + [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE + [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS +} + +rabbit_validate_partial() { + if [ ! -x $RABBITMQ_MULTI ]; then + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi + + if [ ! -x $RABBITMQ_CTL ]; then + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi +} + +rabbit_validate_full() { + if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + rabbit_validate_partial + + return $OCF_SUCCESS +} + +rabbit_status() { + local rc + $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null + rc=$? + case "$rc" in + 0) + return $OCF_SUCCESS + ;; + 2) + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + return $OCF_ERR_GENERIC + esac +} + +rabbit_start() { + local rc + + rabbit_validate_full + rc=$? + if [ "$rc" != $OCF_SUCCESS ]; then + return $rc + fi + + export_vars + + $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc + fi + + # Spin waiting for the server to come up. + # Let the CRM/LRM time us out if required + start_wait=1 + while [ $start_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_stop() { + local rc + $RABBITMQ_MULTI stop_all & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc + fi + + # Spin waiting for the server to shut down. + # Let the CRM/LRM time us out if required + stop_wait=1 + while [ $stop_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 + break + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_monitor() { + rabbit_status + return $? +} + +case $__OCF_ACTION in + meta-data) + meta_data + exit $OCF_SUCCESS + ;; + usage|help) + rabbit_usage + exit $OCF_SUCCESS + ;; +esac + +rabbit_validate_partial || exit + +case $__OCF_ACTION in + start) + rabbit_start + ;; + stop) + rabbit_stop + ;; + monitor) + rabbit_monitor + ;; + validate-all) + exit $OCF_SUCCESS + ;; + *) + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; +esac + +exit $?
\ No newline at end of file diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 3799c438..45659602 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -21,3 +21,4 @@ install/rabbitmq-server:: install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm + install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 8aa5ad8d..53335a6f 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -53,7 +53,7 @@ -define(MAX_WRAP_ENTRIES, 500). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,7 +64,7 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- @@ -78,10 +78,10 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). +-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). -spec(serial/0 :: () -> non_neg_integer()). @@ -128,7 +128,8 @@ init(_Args) -> Snapshot = #psnapshot{serial = 0, transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, []), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -153,12 +154,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], pending_replies = [], - snapshot = NewSnapshot}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -236,8 +237,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline, %% "tied" is met. log_work(CreateWorkUnit, MessageList, State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> + snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( fun(M = {publish, Message, QK = {_QName, PKey}}) -> @@ -343,20 +343,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), InnerSnapshot = {{serial, Serial}, {txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. @@ -380,14 +382,15 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ serial = Serial, - transactions = Ts}), + transactions = Ts, + next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so @@ -407,8 +410,8 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> Work = ets:foldl( - fun ({{QName, PKey}, Delivered}, Acc) -> - rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( @@ -419,8 +422,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, fun ({QName, Requeues}) -> requeue(QName, Requeues, Messages) end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], + NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], + NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], ets:delete_all_objects(Messages), ets:delete_all_objects(Queues), true = ets:insert(Messages, NewMessages), @@ -432,8 +435,8 @@ requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + [{SeqId, QName, PKey, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -443,7 +446,7 @@ requeue(QName, Requeues, Messages) -> %% per-channel basis, and channels are bound to specific %% processes, sorting the list does provide the correct %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- + [{Message, Delivered} || {_, _, _, Message, Delivered} <- lists:sort(RequeueMessages)]), RequeueMessages; {error, not_found} -> @@ -477,39 +480,48 @@ internal_integrate1({rollback_transaction, Key}, Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. |