summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-22 14:49:18 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-22 14:49:18 +0100
commita7f6713daf4bf5bce503d80583949fc47cb2914a (patch)
treee34c53dc90eaadc636f2fee0834aa1d8f86aee93
parent46dc9ff82442dd4c73a00eb2ad21e91fd1fc9c09 (diff)
parentfcf7e73bae8f96cd5ffff1c06ac5453476831ab7 (diff)
downloadrabbitmq-server-a7f6713daf4bf5bce503d80583949fc47cb2914a.tar.gz
Merging bug 22615 onto default
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec4
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf362
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--src/rabbit_persister.erl128
4 files changed, 437 insertions, 58 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c318a96c..3ed907c8 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -10,6 +10,7 @@ Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
+Source5: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
BuildRequires: erlang, python-simplejson
@@ -29,6 +30,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
+%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -38,6 +40,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
+cp %{S:5} %{_rabbit_server_ocf}
make %{?_smp_mflags}
%install
@@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
+install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
new file mode 100755
index 00000000..97c58ea2
--- /dev/null
+++ b/packaging/common/rabbitmq-server.ocf
@@ -0,0 +1,362 @@
+#!/bin/sh
+##
+## OCF Resource Agent compliant rabbitmq-server resource script.
+##
+
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2010 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+## OCF instance parameters
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
+
+#######################################################################
+# Initialization:
+
+. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+
+#######################################################################
+
+OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi"
+OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
+OCF_RESKEY_nodename_default="rabbit@localhost"
+OCF_RESKEY_log_base_default="/var/log/rabbitmq"
+: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}}
+: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
+: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
+: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
+
+meta_data() {
+ cat <<END
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="rabbitmq-server">
+<version>1.0</version>
+
+<longdesc lang="en">
+Resource agent for RabbitMQ-server
+</longdesc>
+
+<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc>
+
+<parameters>
+<parameter name="multi" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmq-multi script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmq-multi</shortdesc>
+<content type="string" default="${OCF_RESKEY_multi_default}" />
+</parameter>
+
+<parameter name="ctl" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmqctl script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmqctl</shortdesc>
+<content type="string" default="${OCF_RESKEY_ctl_default}" />
+</parameter>
+
+<parameter name="nodename" unique="0" required="0">
+<longdesc lang="en">
+The node name for rabbitmq-server
+</longdesc>
+<shortdesc lang="en">Node name</shortdesc>
+<content type="string" default="${OCF_RESKEY_nodename_default}" />
+</parameter>
+
+<parameter name="ip" unique="0" required="0">
+<longdesc lang="en">
+The IP address for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Address</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="port" unique="0" required="0">
+<longdesc lang="en">
+The IP Port for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Port</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="cluster_config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the cluster config file
+</longdesc>
+<shortdesc lang="en">Cluster config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the config file
+</longdesc>
+<shortdesc lang="en">Config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="log_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which logs will be created
+</longdesc>
+<shortdesc lang="en">Log base path</shortdesc>
+<content type="string" default="${OCF_RESKEY_log_base_default}" />
+</parameter>
+
+<parameter name="mnesia_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which mnesia will store data
+</longdesc>
+<shortdesc lang="en">Mnesia base path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="server_start_args" unique="0" required="0">
+<longdesc lang="en">
+Additional arguments provided to the server on startup
+</longdesc>
+<shortdesc lang="en">Server start arguments</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+</parameters>
+
+<actions>
+<action name="start" timeout="600" />
+<action name="stop" timeout="120" />
+<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="validate-all" timeout="30" />
+<action name="meta-data" timeout="5" />
+</actions>
+</resource-agent>
+END
+}
+
+rabbit_usage() {
+ cat <<END
+usage: $0 {start|stop|monitor|validate-all|meta-data}
+
+Expects to have a fully populated OCF RA-compliant environment set.
+END
+}
+
+RABBITMQ_MULTI=$OCF_RESKEY_multi
+RABBITMQ_CTL=$OCF_RESKEY_ctl
+RABBITMQ_NODENAME=$OCF_RESKEY_nodename
+RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip
+RABBITMQ_NODE_PORT=$OCF_RESKEY_port
+RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file
+RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file
+RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base
+RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base
+RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args
+[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME"
+[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME
+
+export_vars() {
+ [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS
+ [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT
+ [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE
+ [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE
+ [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE
+ [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE
+ [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS
+}
+
+rabbit_validate_partial() {
+ if [ ! -x $RABBITMQ_MULTI ]; then
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -x $RABBITMQ_CTL ]; then
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+}
+
+rabbit_validate_full() {
+ if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ rabbit_validate_partial
+
+ return $OCF_SUCCESS
+}
+
+rabbit_status() {
+ local rc
+ $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
+ rc=$?
+ case "$rc" in
+ 0)
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ return $OCF_ERR_GENERIC
+ esac
+}
+
+rabbit_start() {
+ local rc
+
+ rabbit_validate_full
+ rc=$?
+ if [ "$rc" != $OCF_SUCCESS ]; then
+ return $rc
+ fi
+
+ export_vars
+
+ $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to come up.
+ # Let the CRM/LRM time us out if required
+ start_wait=1
+ while [ $start_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_stop() {
+ local rc
+ $RABBITMQ_MULTI stop_all &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to shut down.
+ # Let the CRM/LRM time us out if required
+ stop_wait=1
+ while [ $stop_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
+ break
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_monitor() {
+ rabbit_status
+ return $?
+}
+
+case $__OCF_ACTION in
+ meta-data)
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
+ usage|help)
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
+esac
+
+rabbit_validate_partial || exit
+
+case $__OCF_ACTION in
+ start)
+ rabbit_start
+ ;;
+ stop)
+ rabbit_stop
+ ;;
+ monitor)
+ rabbit_monitor
+ ;;
+ validate-all)
+ exit $OCF_SUCCESS
+ ;;
+ *)
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
+esac
+
+exit $? \ No newline at end of file
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 3799c438..45659602 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -21,3 +21,4 @@ install/rabbitmq-server::
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
+ install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 8aa5ad8d..53335a6f 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -53,7 +53,7 @@
-define(MAX_WRAP_ENTRIES, 500).
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +64,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -78,10 +78,10 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
+-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
+-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
-spec(serial/0 :: () -> non_neg_integer()).
@@ -128,7 +128,8 @@ init(_Args) ->
Snapshot = #psnapshot{serial = 0,
transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -153,12 +154,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -236,8 +237,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline,
%% "tied" is met.
log_work(CreateWorkUnit, MessageList,
State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
+ snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
fun(M = {publish, Message, QK = {_QName, PKey}}) ->
@@ -343,20 +343,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{serial = Serial,
+ transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
InnerSnapshot = {{serial, Serial},
{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
@@ -380,14 +382,15 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
serial = Serial,
- transactions = Ts}),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
@@ -407,8 +410,8 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
Work = ets:foldl(
- fun ({{QName, PKey}, Delivered}, Acc) ->
- rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc)
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
@@ -419,8 +422,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
fun ({QName, Requeues}) ->
requeue(QName, Requeues, Messages)
end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
+ NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
ets:delete_all_objects(Messages),
ets:delete_all_objects(Queues),
true = ets:insert(Messages, NewMessages),
@@ -432,8 +435,8 @@ requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ [{SeqId, QName, PKey, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -443,7 +446,7 @@ requeue(QName, Requeues, Messages) ->
%% per-channel basis, and channels are bound to specific
%% processes, sorting the list does provide the correct
%% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
+ [{Message, Delivered} || {_, _, _, Message, Delivered} <-
lists:sort(RequeueMessages)]),
RequeueMessages;
{error, not_found} ->
@@ -477,39 +480,48 @@ internal_integrate1({rollback_transaction, Key},
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.