summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-11 23:15:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-11 23:15:59 +0100
commit3a4049025841e83a8ea6166f01eb64bf8558c845 (patch)
tree718058c22af605b3390835184806df09808629b8
parentc54549506c2086b09bf9831455d410c86a4baf65 (diff)
parent7ff061f10ee33832fba9864313a0191b20bea7b0 (diff)
downloadrabbitmq-server-3a4049025841e83a8ea6166f01eb64bf8558c845.tar.gz
merging in from default
-rw-r--r--Makefile2
-rw-r--r--include/rabbit.hrl7
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_amqqueue.erl67
-rw-r--r--src/rabbit_amqqueue_process.erl437
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_db_queue.erl454
-rw-r--r--src/rabbit_db_queue_schema.sql22
-rw-r--r--src/rabbit_disk_queue.erl1755
-rw-r--r--src/rabbit_guid.erl22
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_mixed_queue.erl394
-rw-r--r--src/rabbit_mnesia.erl51
-rw-r--r--src/rabbit_persister.erl523
-rw-r--r--src/rabbit_queue_mode_manager.erl105
-rw-r--r--src/rabbit_tests.erl387
18 files changed, 3419 insertions, 845 deletions
diff --git a/Makefile b/Makefile
index 4ff8573a..68fc57a5 100644
--- a/Makefile
+++ b/Makefile
@@ -69,7 +69,7 @@ clean: cleandb
rm -f docs/*.[0-9].gz
cleandb: stop-node
- erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*")), halt().'
+ erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*") ++ filelib:wildcard(filename:join(mnesia:system_info(directory), "rabbit_disk_queue/*"))), halt().'
############ various tasks to interact with RabbitMQ ###################
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 784c21b3..a2840931 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,9 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}).
+
+-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id, next_seq_id}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -134,7 +136,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: bool()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: bool(),
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 8502d60a..0aa09bd8 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -98,6 +98,7 @@ exec erl \
-os_mon memsup_system_only true \
-os_mon system_memory_high_watermark 0.95 \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
+ -mnesia dump_log_write_threshold 10000 \
${RABBITMQ_CLUSTER_CONFIG_OPTION} \
${RABBITMQ_SERVER_START_ARGS} \
"$@"
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1ddb5151..44e4dc7f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -138,6 +138,8 @@ start(normal, []) ->
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
+
+ ok = start_child(rabbit_queue_mode_manager),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
@@ -145,15 +147,20 @@ start(normal, []) ->
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
end},
+ {"disk queue",
+ fun () ->
+ ok = start_child(rabbit_disk_queue),
+ ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME
+ end},
{"recovery",
fun () ->
ok = maybe_insert_default_data(),
ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
+ {ok, DurableQueues} = rabbit_amqqueue:recover(),
+ DurableQueueNames =
+ sets:from_list(lists:map(
+ fun(Q) -> Q #amqqueue.name end, DurableQueues)),
+ ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames)
end},
{"guid generator",
fun () ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 198e2782..01d40aa1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([constrain_memory/2]).
-import(mnesia).
-import(gen_server2).
@@ -103,6 +104,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(constrain_memory/2 :: (pid(), bool()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -121,37 +123,39 @@ start() ->
ok.
recover() ->
- ok = recover_durable_queues(),
- ok.
+ {ok, DurableQueues} = recover_durable_queues(),
+ {ok, DurableQueues}.
recover_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
- end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ DurableQueues =
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> [Q|Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
+ end
+ end, [],
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end)),
+ {ok, DurableQueues}.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -305,10 +309,13 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 10, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 10, {unblock, ChPid}).
+
+constrain_memory(QPid, Constrain) ->
+ gen_server2:pcast(QPid, 10, {constrain, Constrain}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f..ff0cc56b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -51,8 +51,8 @@
owner,
exclusive_consumer,
has_had_consumers,
+ mixed_state,
next_msg_id,
- message_buffer,
active_consumers,
blocked_consumers}).
@@ -92,23 +92,27 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
-init(Q) ->
+init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ {ok, Mode} = rabbit_queue_mode_manager:register(self()),
+ {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
+ mixed_state = MS,
next_msg_id = 1,
- message_buffer = queue:new(),
active_consumers = queue:new(),
blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
- all_tx()),
- ok = purge_message_buffer(QName, State#q.message_buffer),
+ NewState =
+ lists:foldl(fun (Txn, State1) ->
+ rollback_transaction(Txn, State1)
+ end, State, all_tx()),
+ rabbit_mixed_queue:delete_queue(NewState #q.mixed_state),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
@@ -165,12 +169,11 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_immediately(Message, Delivered,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
+deliver_queue(Fun, FunAcc0,
+ State = #q{q = #amqqueue{name = QName},
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
+ next_msg_id = NextId}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -178,13 +181,18 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ IsMsgReady = Fun(is_message_ready, FunAcc0, State),
+ case (IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
+ {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} =
+ Fun(AckRequired, FunAcc0, State),
+ ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
+ {QName, self(), NextId, IsDelivered, Msg}),
NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
+ true -> dict:store(NextId, {Msg, AckTag}, UAM);
false -> UAM
end,
NewC = C#cr{unsent_message_count = Count + 1,
@@ -202,54 +210,100 @@ deliver_immediately(Message, Delivered,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1}};
- false ->
+ State3 = State2 #q { active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1
+ },
+ if Remaining == 0 -> {FunAcc1, State3};
+ true -> deliver_queue(Fun, FunAcc1, State3)
+ end;
+ %% if IsMsgReady then we've hit the limiter
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
- deliver_immediately(
- Message, Delivered,
+ deliver_queue(
+ Fun, FunAcc0,
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ blocked_consumers = NewBlockedConsumers});
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc0, State}
end;
{empty, _} ->
- {not_offered, State}
+ {FunAcc0, State}
end.
-attempt_delivery(none, _ChPid, Message, State) ->
- case deliver_immediately(Message, false, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- persist_message(none, qname(State), Message),
- persist_delivery(qname(State), Message, false),
- {true, State1};
- {not_offered, State1} ->
- {false, State1}
- end;
-attempt_delivery(Txn, ChPid, Message, State) ->
- persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, ChPid, Message),
- {true, State}.
-
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
+deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) ->
+ 0 /= rabbit_mixed_queue:length(MS);
+deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) ->
+ {Res, MS2} = rabbit_mixed_queue:deliver(MS),
+ MS3 = case {Res, AckRequired} of
+ {_, true} -> MS2;
+ {empty, _} -> MS2;
+ {{_Msg, _IsDelivered, AckTag, _Remaining}, false} ->
+ {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2),
+ MS4
+ end,
+ {Res, Acc, State #q { mixed_state = MS3 }}.
+
+run_message_queue(State) ->
+ {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State),
+ State2.
+
+attempt_immediate_delivery(none, _ChPid, Msg, State) ->
+ Fun =
+ fun (is_message_ready, false, _State) ->
+ true;
+ (AckRequired, false, State2) ->
+ {AckTag, State3} =
+ if AckRequired ->
+ {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(
+ Msg, State2 #q.mixed_state),
+ {AckTag2, State2 #q { mixed_state = MS }};
+ true ->
+ {noack, State2}
+ end,
+ {{Msg, false, AckTag, 0}, true, State3}
+ end,
+ deliver_queue(Fun, false, State);
+attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
+ {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
+ record_pending_message(Txn, ChPid, Msg),
+ {true, State #q { mixed_state = MS }}.
+
+deliver_or_enqueue(Txn, ChPid, Msg, State) ->
+ case attempt_immediate_delivery(Txn, ChPid, Msg, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message),
- NewMB = queue:in({Message, false}, NewState#q.message_buffer),
- {false, NewState#q{message_buffer = NewMB}}
+ %% Txn is none and no unblocked channels with consumers
+ {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state),
+ {false, NewState #q { mixed_state = MS }}
end.
-deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+%% all these messages have already been delivered at least once and
+%% not ack'd, but need to be either redelivered or requeued
+deliver_or_requeue_n([], State) ->
+ run_message_queue(State);
+deliver_or_requeue_n(MsgsWithAcks, State) ->
+ {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
+ deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
+ {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state),
+ case OutstandingMsgs of
+ [] -> run_message_queue(NewState #q { mixed_state = MS });
+ _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
+ NewState #q { mixed_state = MS2 }
+ end.
+
+deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+ -1 < Len;
+deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
+deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -283,7 +337,7 @@ possibly_unblock(State, ChPid, Update) ->
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
- run_poke_burst(
+ run_message_queue(
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedeConsumers})
end
@@ -300,27 +354,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn)
- end,
- NewState =
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
+ State1 =
+ case Txn of
+ none -> State;
+ _ -> rollback_transaction(Txn, State)
+ end,
+ State2 =
+ deliver_or_requeue_n(
+ [MsgWithAck ||
+ {_MsgId, MsgWithAck} <- dict:to_list(UAM)],
+ State1 # q {
exclusive_consumer = case Holder of
{ChPid, _} -> none;
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State1#q.active_consumers),
blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)}),
- case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ ChPid, State1#q.blocked_consumers)}),
+ case should_auto_delete(State2) of
+ false -> noreply(State2);
+ true -> {stop, normal, State2}
end
end.
@@ -343,26 +397,6 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(MessageBuffer, State).
-
-run_poke_burst(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered),
- run_poke_burst(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
- run_poke_burst(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
- end;
- {empty, _} ->
- State#q{message_buffer = MessageBuffer}
- end.
-
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
queue:is_empty(State#q.blocked_consumers).
@@ -371,62 +405,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_message(Txn, QName, Message) ->
- M = Message#basic_message{
- %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore
- content = rabbit_binary_parser:clear_decoded_content(
- Message#basic_message.content)},
- persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
-
-persist_delivery(_QName, _Message,
- true) ->
- ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
- _Delivered) ->
- ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
- _Delivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
-
-persist_acks(Txn, QName, Messages) ->
- persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
-
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
- %% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
-
-commit_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName).
-
-rollback_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName).
-
-%% optimisation: don't do unnecessary work
-%% it would be nice if this was handled by the persister
-do_if_persistent(F, Txn, QName) ->
- case is_tx_persistent(Txn) of
- false -> ok;
- true -> ok = F({Txn, QName})
- end.
-
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
@@ -448,19 +426,12 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-mark_tx_persistent(Txn) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{is_persistent = true}).
-
-is_tx_persistent(Txn) ->
- #tx{is_persistent = Res} = lookup_tx(Txn),
- Res.
-
-record_pending_message(Txn, ChPid, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
+record_pending_message(Txn, ChPid, Message = #basic_message { is_persistent = IsPersistent }) ->
+ Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
+ is_persistent = IsPersistentTxn orelse IsPersistent
+ }).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
@@ -468,38 +439,43 @@ record_pending_acks(Txn, ChPid, MsgIds) ->
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
ch_pid = ChPid}).
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
- deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
+commit_transaction(Txn, State) ->
+ #tx { ch_pid = ChPid,
+ pending_messages = PendingMessages,
+ pending_acks = PendingAcks
+ } = lookup_tx(Txn),
+ PendingMessagesOrdered = lists:reverse(PendingMessages),
+ PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)),
+ {ok, MS} =
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered, [], State #q.mixed_state);
+ C = #cr { unacked_messages = UAM } ->
+ {MsgWithAcks, Remaining} =
+ collect_messages(PendingAcksOrdered, UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered,
+ lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ State #q.mixed_state)
+ end,
+ State #q { mixed_state = MS }.
+
+rollback_transaction(Txn, State) ->
+ #tx { pending_messages = PendingMessages
+ } = lookup_tx(Txn),
+ {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
+ erase_tx(Txn),
+ State #q { mixed_state = MS }.
+%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
+%% err, A = C `intersect` D , via projection through the dict that is C
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
UAM, MsgIds).
-purge_message_buffer(QName, MessageBuffer) ->
- Messages =
- [[Message || {Message, _Delivered} <-
- queue:to_list(MessageBuffer)] |
- lists:map(
- fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MessageId, Message} <- dict:to_list(UAM)]
- end,
- all_ch_record())],
- %% the simplest, though certainly not the most obvious or
- %% efficient, way to purge messages from the persister is to
- %% artifically ack them.
- persist_acks(none, QName, lists:append(Messages)).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -508,8 +484,8 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(messages_ready, #q{message_buffer = MessageBuffer}) ->
- queue:len(MessageBuffer);
+i(messages_ready, #q { mixed_state = MS }) ->
+ rabbit_mixed_queue:length(MS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -558,7 +534,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
+ {Delivered, NewState} = attempt_immediate_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
@@ -567,12 +543,11 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
- ok = commit_work(Txn, qname(State)),
+ NewState = commit_transaction(Txn, State),
%% optimisation: we reply straight away so the sender can continue
gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
erase_tx(Txn),
- noreply(NewState);
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -582,25 +557,26 @@ handle_call({notify_down, ChPid}, From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
next_msg_id = NextId,
- message_buffer = MessageBuffer}) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
+ mixed_state = MS
+ }) ->
+ case rabbit_mixed_queue:deliver(MS) of
+ {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 });
+ {{Msg, IsDelivered, AckTag, Remaining}, MS2} ->
AckRequired = not(NoAck),
- case AckRequired of
- true ->
- persist_delivery(QName, Message, Delivered),
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM});
- false ->
- persist_auto_ack(QName, Message)
- end,
- Msg = {QName, self(), NextId, Delivered, Message},
- reply({ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail,
- next_msg_id = NextId + 1});
- {empty, _} ->
- reply(empty, State)
+ {ok, MS3} =
+ if AckRequired ->
+ C = #cr{unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
+ store_ch_record(C#cr{unacked_messages = NewUAM}),
+ {ok, MS2};
+ true ->
+ rabbit_mixed_queue:ack([AckTag], MS2)
+ end,
+ Message = {QName, self(), NextId, IsDelivered, Msg},
+ reply({ok, Remaining, Message},
+ State #q { next_msg_id = NextId + 1,
+ mixed_state = MS3
+ })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -640,7 +616,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
add_consumer(
ChPid, Consumer,
State1#q.blocked_consumers)};
- false -> run_poke_burst(
+ false -> run_message_queue(
State1#q{
active_consumers =
add_consumer(
@@ -682,14 +658,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- message_buffer = MessageBuffer,
+ mixed_state = MS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
- State);
+ Length = rabbit_mixed_queue:length(MS),
+ reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer}) ->
- IsEmpty = queue:is_empty(MessageBuffer),
+ State = #q { mixed_state = MS }) ->
+ Length = rabbit_mixed_queue:length(MS),
+ IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
@@ -697,13 +674,13 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, queue:len(MessageBuffer)}, State}
+ {stop, normal, {ok, Length}, State}
end;
-handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
- ok = purge_message_buffer(qname(State), MessageBuffer),
- reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
+handle_call(purge, _From, State) ->
+ {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
+ reply({ok, Count},
+ State #q { mixed_state = MS });
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -737,24 +714,21 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Acked, Remaining} = collect_messages(MsgIds, UAM),
- persist_acks(Txn, qname(State), Acked),
+ {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
- store_ch_record(C#cr{unacked_messages = Remaining});
+ Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ noreply(State #q { mixed_state = MS });
_ ->
- record_pending_acks(Txn, ChPid, MsgIds)
- end,
- noreply(State)
+ record_pending_acks(Txn, ChPid, MsgIds),
+ noreply(State)
+ end
end;
handle_cast({rollback, Txn}, State) ->
- ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn),
- noreply(State);
-
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
+ noreply(rollback_transaction(Txn, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
@@ -763,10 +737,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[ChPid]),
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM),
+ {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
+ noreply(deliver_or_requeue_n(MsgWithAcks, State))
end;
handle_cast({unblock, ChPid}, State) ->
@@ -795,7 +768,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
+ {ok, MS2} = if Constrain -> rabbit_mixed_queue:to_disk_only_mode(MS);
+ true -> rabbit_mixed_queue:to_mixed_mode(MS)
+ end,
+ noreply(State #q { mixed_state = MS2 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 761b3863..0673bdd8 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -72,4 +72,5 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
content = Content,
- persistent_key = none}.
+ guid = rabbit_guid:guid(),
+ is_persistent = false}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3089bb62..ed715097 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -315,14 +315,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent)},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl
new file mode 100644
index 00000000..897a4a6f
--- /dev/null
+++ b/src/rabbit_db_queue.erl
@@ -0,0 +1,454 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% So, assuming you're on some debian linux type system,
+%% apt-get install postgresql odbc-postgresql unixodbc unixodbc-bin
+%% sudo odbcinst -i -d -f /usr/share/psqlodbc/odbcinst.ini.template
+
+%% Now set up in postgresql a user and a database that user can
+%% access. For example, the database could be called rabbit_db_queue
+%% and the username could be rabbit and the password could be rabbit.
+
+%% sudo ODBCConfig
+%% set up a system wide dsn with the above settings in it.
+%% now drop into the erlang shell, and you should not get an error after:
+
+%% > odbc:start().
+%% < ok.
+%% > odbc:connect("DSN=rabbit_db_queue", []).
+%% < {ok,<0.325.0>}
+%% ( replace rabbit_db_queue with the name of your DSN that you configured )
+
+%% the connection string (eg "DSN=rabbit_db_queue") is what you pass
+%% to start_link. Don't just pass the DSN name.
+
+-module(rabbit_db_queue).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2,
+ tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
+
+-export([stop/0, stop_and_obliterate/0]).
+
+-include("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+%% ---- SPECS ----
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+
+-spec(start_link/1 :: (non_neg_integer()) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
+-spec(deliver/1 :: (queue_name()) ->
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}}).
+-spec(phantom_deliver/1 :: (queue_name()) ->
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
+-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
+-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
+-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_obliterate/0 :: () -> 'ok').
+
+-endif.
+
+%% ---- PUBLIC API ----
+
+start_link(DSN) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [DSN], []).
+
+publish(Q, MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
+
+deliver(Q) ->
+ gen_server:call(?SERVER, {deliver, Q}, infinity).
+
+phantom_deliver(Q) ->
+ gen_server:call(?SERVER, {phantom_deliver, Q}).
+
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
+
+tx_publish(MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
+
+tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+ gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+
+tx_cancel(MsgIds) when is_list(MsgIds) ->
+ gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+
+requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+
+purge(Q) ->
+ gen_server:call(?SERVER, {purge, Q}).
+
+stop() ->
+ gen_server:call(?SERVER, stop, infinity).
+
+stop_and_obliterate() ->
+ gen_server:call(?SERVER, stop_vaporise, infinity).
+
+%% ---- GEN-SERVER INTERNAL API ----
+-record(dbstate, { db_conn }).
+
+init([DSN]) ->
+ process_flag(trap_exit, true),
+ odbc:start(),
+ {ok, Conn} = odbc:connect(DSN, [{auto_commit, off}, {tuple_row, on},
+ {scrollable_cursors, off}, {trace_driver, off}]),
+ State = #dbstate { db_conn = Conn },
+ compact_already_delivered(State),
+ {ok, State}.
+
+handle_call({deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, true, State),
+ {reply, Result, State1};
+handle_call({phantom_deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, false, State),
+ {reply, Result, State1};
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
+ {reply, ok, State1};
+handle_call({purge, Q}, _From, State) ->
+ {ok, Count, State1} = internal_purge(Q, State),
+ {reply, Count, State1};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}; %% gen_server now calls terminate
+handle_call(stop_vaporise, _From, State = #dbstate { db_conn = Conn }) ->
+ odbc:sql_query(Conn, "delete from ledger"),
+ odbc:sql_query(Conn, "delete from sequence"),
+ odbc:sql_query(Conn, "delete from message"),
+ odbc:commit(Conn, commit),
+ {stop, normal, ok, State}.
+ %% gen_server now calls terminate, which then calls shutdown
+
+handle_cast({publish, Q, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
+ {noreply, State1};
+handle_cast({tx_publish, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({tx_cancel, MsgIds}, State) ->
+ {ok, State1} = internal_tx_cancel(MsgIds, State),
+ {noreply, State1};
+handle_cast({requeue, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
+ {noreply, State1}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ shutdown(State).
+
+shutdown(State = #dbstate { db_conn = Conn }) ->
+ odbc:disconnect(Conn),
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---- UTILITY FUNCTIONS ----
+
+binary_to_escaped_string(Bin) when is_binary(Bin) ->
+ "E'" ++ lists:flatten(lists:reverse(binary_to_escaped_string(Bin, []))) ++ "'".
+
+binary_to_escaped_string(<<>>, Acc) ->
+ Acc;
+binary_to_escaped_string(<<Byte:8, Rest/binary>>, Acc) ->
+ binary_to_escaped_string(Rest, [escape_byte(Byte) | Acc]).
+
+escape_byte(39) ->
+ "\\\\047";
+escape_byte(92) ->
+ "\\\\134";
+escape_byte(B) when B > 31 andalso B < 127 ->
+ B;
+escape_byte(B) ->
+ case io_lib:format("~.8B", [B]) of
+ O1 = [[_]] ->
+ "\\\\00" ++ O1;
+ O2 = [[_,_]] ->
+ "\\\\0" ++ O2;
+ O3 = [[_,_,_]] ->
+ "\\\\" ++ O3
+ end.
+
+escaped_string_to_binary(Str) when is_list(Str) ->
+ list_to_binary(lists:reverse(escaped_string_to_binary(Str, []))).
+
+escaped_string_to_binary([], Acc) ->
+ Acc;
+escaped_string_to_binary([$\\,$\\|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [$\\ | Acc]);
+escaped_string_to_binary([$\\,A,B,C|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [(list_to_integer([A])*64) +
+ (list_to_integer([B])*8) +
+ list_to_integer([C])
+ | Acc]);
+escaped_string_to_binary([C|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [C|Acc]).
+
+hex_string_to_binary(Str) when is_list(Str) ->
+ list_to_binary(lists:reverse(hex_string_to_binary(Str, []))).
+
+hex_string_to_binary([], Acc) ->
+ Acc;
+hex_string_to_binary([A,B|Rest], Acc) ->
+ {ok, [N], []} = io_lib:fread("~16u", [A,B]),
+ hex_string_to_binary(Rest, [N | Acc]).
+
+%% ---- INTERNAL RAW FUNCTIONS ----
+
+internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, empty, State};
+ {selected, _, [{ReadSeqId}]} ->
+ case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++
+ " and seq_id = " ++ integer_to_list(ReadSeqId)) of
+ {selected, _, []} ->
+ {ok, empty, State};
+ {selected, _, [{IsDeliveredStr, MsgIdStr}]} ->
+ IsDelivered = IsDeliveredStr /= "0",
+ if IsDelivered -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId))
+ end,
+ MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)),
+ %% yeah, this is really necessary. sigh
+ MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++
+ " where queue = " ++ QStr),
+ if ReadMsg ->
+ {selected, _, [{MsgBodyStr}]} =
+ odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
+ odbc:commit(Conn, commit),
+ MsgBody = hex_string_to_binary(MsgBodyStr),
+ BodySize = size(MsgBody),
+ {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State};
+ true ->
+ odbc:commit(Conn, commit),
+ {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State}
+ end
+ end
+ end.
+
+internal_ack(Q, MsgSeqIds, State) ->
+ remove_messages(Q, MsgSeqIds, true, State).
+
+%% Q is only needed if LedgerDelete /= false
+%% called from tx_cancel with LedgerDelete = false
+%% called from internal_tx_cancel with LedgerDelete = true
+%% called from ack with LedgerDelete = true
+remove_messages(Q, MsgSeqIds, LedgerDelete, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ lists:foreach(
+ fun ({MsgId, SeqId}) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ {selected, _, [{RefCount}]} =
+ odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++
+ MsgIdStr),
+ case RefCount of
+ 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++
+ MsgIdStr);
+ _ -> odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount - 1) ++ " where msg_id = " ++
+ MsgIdStr)
+ end,
+ if LedgerDelete ->
+ odbc:sql_query(Conn, "delete from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId));
+ true -> ok
+ end
+ end, MsgSeqIds),
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ MsgStr = binary_to_escaped_string(MsgBody),
+ case odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ MsgIdStr) of
+ {selected, _, []} ->
+ odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++
+ MsgIdStr ++ ", " ++ MsgStr ++ ", 1)");
+ {selected, _, [{RefCount}]} ->
+ odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr)
+ end,
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ {InsertOrUpdate, NextWrite} =
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} -> {insert, 0};
+ {selected, _, [{NextWrite2}]} -> {update, NextWrite2}
+ end,
+ NextWrite3 =
+ lists:foldl(fun (MsgId, WriteSeqInteger) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ odbc:sql_query(Conn,
+ "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
+ QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++
+ MsgIdStr ++ ")"),
+ WriteSeqInteger + 1
+ end, NextWrite, PubMsgIds),
+ case InsertOrUpdate of
+ update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++
+ " where queue = " ++ QStr);
+ insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")")
+ end,
+ odbc:commit(Conn, commit),
+ remove_messages(Q, AckSeqIds, true, State),
+ {ok, State}.
+
+internal_publish(Q, MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ NextWrite =
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:sql_query(Conn,
+ "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, 1)"),
+ 0;
+ {selected, _, [{NextWrite2}]} ->
+ odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++
+ " where queue = " ++ QStr),
+ NextWrite2
+ end,
+ odbc:sql_query(Conn, "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
+ QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"),
+ odbc:commit(Conn, commit),
+ {ok, State1}.
+
+internal_tx_cancel(MsgIds, State) ->
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
+ remove_messages(undefined, MsgSeqIds, false, State).
+
+internal_requeue(Q, MsgSeqIds, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ {selected, _, [{WriteSeqId}]} =
+ odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr),
+ WriteSeqId2 =
+ lists:foldl(
+ fun ({_MsgId, SeqId}, NextWriteSeqId) ->
+ odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++
+ " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr),
+ NextWriteSeqId + 1
+ end, WriteSeqId, MsgSeqIds),
+ odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(WriteSeqId2) ++
+ " where queue = " ++ QStr),
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+
+compact_already_delivered(#dbstate { db_conn = Conn }) ->
+ {selected, _, Seqs} = odbc:sql_query(Conn, "select queue, next_read from sequence"),
+ lists:foreach(
+ fun ({QHexStr, ReadSeqId}) ->
+ Q = binary_to_term(hex_string_to_binary(QHexStr)),
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = "
+ ++ QStr) of
+ {selected, _, []} -> ok;
+ {selected, _, [{null}]} -> ok; %% AGH!
+ {selected, _, [{Min}]} ->
+ Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++
+ integer_to_list(Min + Gap) ++
+ " where queue = " ++ QStr)
+ end
+ end, Seqs),
+ odbc:commit(Conn, commit).
+
+shuffle_up(_Conn, _QStr, SeqId, SeqId, Gap) ->
+ Gap;
+shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) ->
+ GapInc =
+ case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of
+ {selected, _, [{"0"}]} ->
+ 1;
+ {selected, _, [{"1"}]} ->
+ if Gap =:= 0 -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++
+ integer_to_list(SeqId + Gap) ++ " where seq_id = " ++
+ integer_to_list(SeqId) ++ " and queue = " ++ QStr)
+ end,
+ 0
+ end,
+ shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc).
+
+internal_purge(Q, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, 0, State};
+ {selected, _, [{ReadSeqId}]} ->
+ odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr),
+ {selected, _, MsgSeqIds} =
+ odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++
+ QStr ++ " and seq_id >= " ++ ReadSeqId),
+ MsgSeqIds2 = lists:map(
+ fun ({MsgIdStr, SeqIdStr}) ->
+ { binary_to_term(hex_string_to_binary(MsgIdStr)),
+ list_to_integer(SeqIdStr) }
+ end, MsgSeqIds),
+ {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State),
+ {ok, length(MsgSeqIds2), State2}
+ end.
diff --git a/src/rabbit_db_queue_schema.sql b/src/rabbit_db_queue_schema.sql
new file mode 100644
index 00000000..f5c49e8d
--- /dev/null
+++ b/src/rabbit_db_queue_schema.sql
@@ -0,0 +1,22 @@
+create table message (
+ msg_id bytea PRIMARY KEY,
+ msg bytea,
+ ref_count integer NOT NULL
+);
+create index message_msg_id_index on message (msg_id);
+
+create table sequence (
+ queue bytea PRIMARY KEY,
+ next_read integer NOT NULL,
+ next_write integer NOT NULL
+);
+create index sequence_queue_index on sequence (queue);
+
+create table ledger (
+ queue bytea NOT NULL,
+ seq_id integer NOT NULL,
+ is_delivered boolean NOT NULL,
+ msg_id bytea NOT NULL
+);
+create index ledger_queue_seq_id_index on ledger (queue, seq_id);
+
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
new file mode 100644
index 00000000..2b6f7b00
--- /dev/null
+++ b/src/rabbit_disk_queue.erl
@@ -0,0 +1,1755 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_disk_queue).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
+ requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
+ dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1
+ ]).
+
+-export([length/1, is_empty/1, next_write_seq/1]).
+
+-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
+
+-include("rabbit.hrl").
+
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK, 255).
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+
+-define(SERVER, ?MODULE).
+
+-define(MAX_READ_FILE_HANDLES, 256).
+-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+
+-record(dqstate, {msg_location_dets, %% where are messages?
+ msg_location_ets, %% as above, but for ets version
+ operation_mode, %% ram_disk | disk_only
+ file_summary, %% what's in the files?
+ sequences, %% next read and write for each q
+ current_file_num, %% current file name as number
+ current_file_name, %% current file name
+ current_file_handle, %% current file handle
+ current_offset, %% current offset within current file
+ current_dirty, %% has the current file been written to since the last fsync?
+ file_size_limit, %% how big can our files get?
+ read_file_handles, %% file handles for reading (LRU)
+ read_file_handles_limit %% how many file handles can we open?
+ }).
+
+%% The components:
+%%
+%% MsgLocation: this is a dets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize}
+%% FileSummary: this is an ets table which contains:
+%% {File, ValidTotalSize, ContiguousTop, Left, Right}
+%% Sequences: this is an ets table which contains:
+%% {Q, ReadSeqId, WriteSeqId, QueueLength}
+%% rabbit_disk_queue: this is an mnesia table which contains:
+%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+%% is_delivered = IsDelivered,
+%% msg_id = MsgId,
+%% next_seq_id = SeqId
+%% }
+%%
+
+%% The basic idea is that messages are appended to the current file up
+%% until that file becomes too big (> file_size_limit). At that point,
+%% the file is closed and a new file is created on the _right_ of the
+%% old file which is used for new messages. Files are named
+%% numerically ascending, thus the file with the lowest name is the
+%% eldest file.
+%%
+%% We need to keep track of which messages are in which files (this is
+%% the MsgLocation table); how much useful data is in each file and
+%% which files are on the left and right of each other. This is the
+%% purpose of the FileSummary table.
+%%
+%% As messages are removed from files, holes appear in these
+%% files. The field ValidTotalSize contains the total amount of useful
+%% data left in the file, whilst ContiguousTop contains the amount of
+%% valid data right at the start of each file. These are needed for
+%% garbage collection.
+%%
+%% On publish, we write the message to disk, record the changes to
+%% FileSummary and MsgLocation, and, should this be either a plain
+%% publish, or followed by a tx_commit, we record the message in the
+%% mnesia table. Sequences exists to enforce ordering of messages as
+%% they are published within a queue.
+%%
+%% On delivery, we read the next message to be read from disk
+%% (according to the ReadSeqId for the given queue) and record in the
+%% mnesia table that the message has been delivered.
+%%
+%% On ack we remove the relevant entry from MsgLocation, update
+%% FileSummary and delete from the mnesia table.
+%%
+%% In order to avoid extra mnesia searching, we return the SeqId
+%% during delivery which must be returned in ack - it is not possible
+%% to ack from MsgId alone.
+
+%% As messages are ack'd, holes develop in the files. When we discover
+%% that either a file is now empty or that it can be combined with the
+%% useful data in either its left or right file, we compact the two
+%% files together. This keeps disk utilisation high and aids
+%% performance.
+%%
+%% Given the compaction between two files, the left file is considered
+%% the ultimate destination for the good data in the right file. If
+%% necessary, the good data in the left file which is fragmented
+%% throughout the file is written out to a temporary file, then read
+%% back in to form a contiguous chunk of good data at the start of the
+%% left file. Thus the left file is garbage collected and
+%% compacted. Then the good data from the right file is copied onto
+%% the end of the left file. MsgLocation and FileSummary tables are
+%% updated.
+%%
+%% On startup, we scan the files we discover, dealing with the
+%% possibilites of a crash have occured during a compaction (this
+%% consists of tidyup - the compaction is deliberately designed such
+%% that data is duplicated on disk rather than risking it being lost),
+%% and rebuild the dets and ets tables (MsgLocation, FileSummary,
+%% Sequences) from what we find. We ensure that the messages we have
+%% discovered on disk match exactly with the messages recorded in the
+%% mnesia table.
+
+%% MsgLocation is deliberately a dets table, and the mnesia table is
+%% set to be a disk_only_table in order to ensure that we are not RAM
+%% constrained. However, for performance reasons, it is possible to
+%% call to_ram_disk_mode/0 which will alter the mnesia table to
+%% disc_copies and convert MsgLocation to an ets table. This results
+%% in a massive performance improvement, at the expense of greater RAM
+%% usage. The idea is that when memory gets tight, we switch to
+%% disk_only mode but otherwise try to run in ram_disk mode.
+
+%% So, with this design, messages move to the left. Eventually, they
+%% should end up in a contiguous block on the left and are then never
+%% rewritten. But this isn't quite the case. If in a file there is one
+%% message that is being ignored, for some reason, and messages in the
+%% file to the right and in the current block are being read all the
+%% time then it will repeatedly be the case that the good data from
+%% both files can be combined and will be written out to a new
+%% file. Whenever this happens, our shunned message will be rewritten.
+%%
+%% So, provided that we combine messages in the right order,
+%% (i.e. left file, bottom to top, right file, bottom to top),
+%% eventually our shunned message will end up at the bottom of the
+%% left file. The compaction/combining algorithm is smart enough to
+%% read in good data from the left file that is scattered throughout
+%% (i.e. C and D in the below diagram), then truncate the file to just
+%% above B (i.e. truncate to the limit of the good contiguous region
+%% at the start of the file), then write C and D on top and then write
+%% E, F and G from the right file on top. Thus contiguous blocks of
+%% good data at the bottom of files are not rewritten (yes, this is
+%% the data the size of which is tracked by the ContiguousTop
+%% variable. Judicious use of a mirror is required).
+%%
+%% +-------+ +-------+ +-------+
+%% | X | | G | | G |
+%% +-------+ +-------+ +-------+
+%% | D | | X | | F |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | E |
+%% +-------+ +-------+ +-------+
+%% | C | | F | ===> | D |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | C |
+%% +-------+ +-------+ +-------+
+%% | B | | X | | B |
+%% +-------+ +-------+ +-------+
+%% | A | | E | | A |
+%% +-------+ +-------+ +-------+
+%% left right left
+%%
+%% From this reasoning, we do have a bound on the number of times the
+%% message is rewritten. From when it is inserted, there can be no
+%% files inserted between it and the head of the queue, and the worst
+%% case is that everytime it is rewritten, it moves one position lower
+%% in the file (for it to stay at the same position requires that
+%% there are no holes beneath it, which means truncate would be used
+%% and so it would not be rewritten at all). Thus this seems to
+%% suggest the limit is the number of messages ahead of it in the
+%% queue, though it's likely that that's pessimistic, given the
+%% requirements for compaction/combination of files.
+%%
+%% The other property is that we have is the bound on the lowest
+%% utilisation, which should be 50% - worst case is that all files are
+%% fractionally over half full and can't be combined (equivalent is
+%% alternating full files and files with only one tiny message in
+%% them).
+
+%% ---- SPECS ----
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+-type(seq_id_or_next() :: { seq_id() | 'next' }).
+
+-spec(start_link/0 :: () ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok').
+-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok').
+-spec(deliver/1 :: (queue_name()) ->
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
+-spec(phantom_deliver/1 :: (queue_name()) ->
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
+-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}],
+ [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
+-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}, seq_id()}]).
+-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_obliterate/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
+-spec(to_disk_only_mode/0 :: () -> 'ok').
+-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(next_write_seq/1 :: (queue_name()) -> non_neg_integer()).
+-spec(is_empty/1 :: (queue_name()) -> bool()).
+
+-endif.
+
+%% ---- PUBLIC API ----
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
+
+publish(Q, MsgId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg});
+publish(Q, MsgId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity).
+
+publish_with_seq(Q, MsgId, SeqId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg});
+publish_with_seq(Q, MsgId, SeqId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg},
+ infinity).
+
+deliver(Q) ->
+ gen_server2:call(?SERVER, {deliver, Q}, infinity).
+
+phantom_deliver(Q) ->
+ gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity).
+
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}).
+
+auto_ack_next_message(Q) ->
+ gen_server2:cast(?SERVER, {auto_ack_next_message, Q}).
+
+tx_publish(MsgId, Msg) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}).
+
+tx_commit(Q, PubMsgIds, AckSeqIds)
+ when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+ gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+
+tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds)
+ when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) ->
+ gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds},
+ infinity).
+
+tx_cancel(MsgIds) when is_list(MsgIds) ->
+ gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
+
+requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+
+requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) ->
+ gen_server2:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}).
+
+purge(Q) ->
+ gen_server2:call(?SERVER, {purge, Q}, infinity).
+
+delete_queue(Q) ->
+ gen_server2:cast(?SERVER, {delete_queue, Q}).
+
+dump_queue(Q) ->
+ gen_server2:call(?SERVER, {dump_queue, Q}, infinity).
+
+delete_non_durable_queues(DurableQueues) ->
+ gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity).
+
+stop() ->
+ gen_server2:call(?SERVER, stop, infinity).
+
+stop_and_obliterate() ->
+ gen_server2:call(?SERVER, stop_vaporise, infinity).
+
+to_disk_only_mode() ->
+ gen_server2:pcall(?SERVER, 10, to_disk_only_mode, infinity).
+
+to_ram_disk_mode() ->
+ gen_server2:pcall(?SERVER, 10, to_ram_disk_mode, infinity).
+
+length(Q) ->
+ gen_server2:call(?SERVER, {length, Q}, infinity).
+
+next_write_seq(Q) ->
+ gen_server2:call(?SERVER, {next_write_seq, Q}, infinity).
+
+is_empty(Q) ->
+ 0 == rabbit_disk_queue:length(Q).
+
+%% ---- GEN-SERVER INTERNAL API ----
+
+init([FileSizeLimit, ReadFileHandlesLimit]) ->
+ %% If the gen_server is part of a supervision tree and is ordered
+ %% by its supervisor to terminate, terminate will be called with
+ %% Reason=shutdown if the following conditions apply:
+ %% * the gen_server has been set to trap exit signals, and
+ %% * the shutdown strategy as defined in the supervisor's
+ %% child specification is an integer timeout value, not
+ %% brutal_kill.
+ %% Otherwise, the gen_server will be immediately terminated.
+ process_flag(trap_exit, true),
+ Node = node(),
+ ok =
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
+ disc_only_copies) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node,
+ disc_only_copies}} -> ok;
+ E -> E
+ end,
+ ok = filelib:ensure_dir(form_filename("nothing")),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)),
+ {ok, MsgLocationDets} =
+ dets:open_file(?MSG_LOC_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)},
+ {min_no_slots, 1024*1024},
+ %% man says this should be <= 32M. But it works...
+ {max_no_slots, 1024*1024*1024},
+ {type, set}
+ ]),
+
+ %% it would be better to have this as private, but dets:from_ets/2
+ %% seems to blow up if it is set private
+ MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+
+ InitName = "0" ++ ?FILE_EXTENSION,
+ State =
+ #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ operation_mode = disk_only,
+ file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
+ [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME,
+ [set, private]),
+ current_file_num = 0,
+ current_file_name = InitName,
+ current_file_handle = undefined,
+ current_offset = 0,
+ current_dirty = false,
+ file_size_limit = FileSizeLimit,
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ read_file_handles_limit = ReadFileHandlesLimit
+ },
+ {ok, State1 = #dqstate { current_file_name = CurrentName,
+ current_offset = Offset } } =
+ load_from_disk(State),
+ Path = form_filename(CurrentName),
+ Exists = case file:read_file_info(Path) of
+ {error,enoent} -> false;
+ {ok, _} -> true
+ end,
+ %% read is only needed so that we can seek
+ {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
+ if Exists -> {ok, Offset} = file:position(FileHdl, {bof, Offset});
+ true -> %% new file, so preallocate
+ ok = preallocate(FileHdl, FileSizeLimit, Offset)
+ end,
+ {ok, State1 #dqstate { current_file_handle = FileHdl }}.
+
+handle_call({publish, Q, MsgId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, next, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
+handle_call({publish_with_seq, Q, MsgId, SeqId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, SeqId, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
+handle_call({deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, true, false, State),
+ {reply, Result, State1};
+handle_call({phantom_deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, false, false, State),
+ {reply, Result, State1};
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
+ PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}),
+ {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State),
+ {reply, ok, State1};
+handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, PubSeqMsgIds, AckSeqIds, State),
+ {reply, ok, State1};
+handle_call({purge, Q}, _From, State) ->
+ {ok, Count, State1} = internal_purge(Q, State),
+ {reply, Count, State1};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}; %% gen_server now calls terminate
+handle_call(stop_vaporise, _From, State) ->
+ State1 = #dqstate { file_summary = FileSummary,
+ sequences = Sequences } =
+ shutdown(State), %% tidy up file handles early
+ {atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
+ true = ets:delete(FileSummary),
+ true = ets:delete(Sequences),
+ lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
+ {stop, normal, ok,
+ State1 #dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}};
+ %% gen_server now calls terminate, which then calls shutdown
+handle_call(to_disk_only_mode, _From,
+ State = #dqstate { operation_mode = disk_only }) ->
+ {reply, ok, State};
+handle_call(to_disk_only_mode, _From,
+ State = #dqstate { operation_mode = ram_disk,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to disk only mode~n", []),
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_only_copies),
+ ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
+ true = ets:delete_all_objects(MsgLocationEts),
+ {reply, ok, State #dqstate { operation_mode = disk_only }};
+handle_call(to_ram_disk_mode, _From,
+ State = #dqstate { operation_mode = ram_disk }) ->
+ {reply, ok, State};
+handle_call(to_ram_disk_mode, _From,
+ State = #dqstate { operation_mode = disk_only,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to ram disk mode~n", []),
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_copies),
+ true = ets:from_dets(MsgLocationEts, MsgLocationDets),
+ ok = dets:delete_all_objects(MsgLocationDets),
+ {reply, ok, State #dqstate { operation_mode = ram_disk }};
+handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
+ {reply, Length, State};
+handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q),
+ {reply, WriteSeqId, State};
+handle_call({dump_queue, Q}, _From, State) ->
+ {Result, State1} = internal_dump_queue(Q, State),
+ {reply, Result, State1};
+handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
+ {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
+ {reply, ok, State1}.
+
+handle_cast({publish, Q, MsgId, MsgBody}, State) ->
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State),
+ {noreply, State1};
+handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) ->
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State),
+ {noreply, State1};
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
+ {noreply, State1};
+handle_cast({auto_ack_next_message, Q}, State) ->
+ {ok, State1} = internal_auto_ack(Q, State),
+ {noreply, State1};
+handle_cast({tx_publish, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({tx_cancel, MsgIds}, State) ->
+ {ok, State1} = internal_tx_cancel(MsgIds, State),
+ {noreply, State1};
+handle_cast({requeue, Q, MsgSeqIds}, State) ->
+ MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}),
+ {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
+ {noreply, State1};
+handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
+ {noreply, State1};
+handle_cast({delete_queue, Q}, State) ->
+ {ok, State1} = internal_delete_queue(Q, State),
+ {noreply, State1}.
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ shutdown(State).
+
+shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ current_file_handle = FileHdl,
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
+ }) ->
+ %% deliberately ignoring return codes here
+ dets:close(MsgLocationDets),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)),
+ true = ets:delete_all_objects(MsgLocationEts),
+ if FileHdl =:= undefined -> ok;
+ true -> file:sync(FileHdl),
+ file:close(FileHdl)
+ end,
+ dict:fold(fun (_File, Hdl, _Acc) ->
+ file:close(Hdl)
+ end, ok, ReadHdls),
+ State #dqstate { current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handles = {dict:new(), gb_trees:empty()}}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---- UTILITY FUNCTIONS ----
+
+form_filename(Name) ->
+ filename:join(base_directory(), Name).
+
+base_directory() ->
+ filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
+
+zip_with_tail(List1, List2) when length(List1) =:= length(List2) ->
+ lists:zip(List1, List2);
+zip_with_tail(List = [_|Tail], {last, E}) ->
+ zip_with_tail(List, Tail ++ [E]);
+zip_with_tail(List, {duplicate, E}) ->
+ zip_with_tail(List, lists:duplicate(erlang:length(List), E)).
+
+dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
+ Key) ->
+ dets:lookup(MsgLocationDets, Key);
+dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
+ Key) ->
+ ets:lookup(MsgLocationEts, Key).
+
+dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
+ Key) ->
+ ok = dets:delete(MsgLocationDets, Key);
+dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
+ Key) ->
+ true = ets:delete(MsgLocationEts, Key),
+ ok.
+
+dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
+ Obj) ->
+ ok = dets:insert(MsgLocationDets, Obj);
+dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
+ Obj) ->
+ true = ets:insert(MsgLocationEts, Obj),
+ ok.
+
+dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
+ Obj) ->
+ true = dets:insert_new(MsgLocationDets, Obj);
+dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
+ Obj) ->
+ true = ets:insert_new(MsgLocationEts, Obj).
+
+dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
+ Obj) ->
+ dets:match_object(MsgLocationDets, Obj);
+dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
+ Obj) ->
+ ets:match_object(MsgLocationEts, Obj).
+
+find_next_seq_id(CurrentSeq, next) ->
+ CurrentSeq + 1;
+find_next_seq_id(CurrentSeq, NextSeqId)
+ when NextSeqId > CurrentSeq ->
+ NextSeqId.
+
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) ->
+ CurrentReadWrite;
+determine_next_read_id(CurrentRead, _CurrentWrite, next) ->
+ CurrentRead;
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite)
+ when NextWrite > CurrentReadWrite ->
+ NextWrite;
+determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
+ when NextWrite >= CurrentWrite ->
+ CurrentRead.
+
+get_read_handle(File, State =
+ #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
+ read_file_handles_limit = ReadFileHandlesLimit,
+ current_file_name = CurName,
+ current_file_handle = CurHdl,
+ current_dirty = IsDirty
+ }) ->
+ IsDirty2 = if CurName == File andalso IsDirty ->
+ file:sync(CurHdl),
+ false;
+ true -> IsDirty
+ end,
+ Now = now(),
+ {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, ReadHdls, ReadHdlsAge};
+ _False ->
+ {Then, OldFile, ReadHdlsAge2} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ end;
+ {ok, {Hdl, Then}} ->
+ {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ end,
+ ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
+ {FileHdl, State #dqstate { read_file_handles = {ReadHdls3, ReadHdlsAge3},
+ current_dirty = IsDirty2
+ }}.
+
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) ->
+ SuppliedSeqId;
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty)
+ when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
+ ok = mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
+ SuppliedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock)
+ when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock),
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId },
+ Lock),
+ SuppliedSeqId.
+
+sequence_lookup(Sequences, Q) ->
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ {0, 0, 0};
+ [{Q, ReadSeqId, WriteSeqId, Length}] ->
+ {ReadSeqId, WriteSeqId, Length}
+ end.
+
+%% ---- INTERNAL RAW FUNCTIONS ----
+
+internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {ok, empty, State};
+ [{Q, SeqId, SeqId, 0}] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 ->
+ Remaining = Length - 1,
+ {ok, Result, NextReadSeqId, State1} =
+ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State),
+ true = ets:insert(Sequences,
+ {Q, NextReadSeqId, WriteSeqId, Remaining}),
+ {ok,
+ case Result of
+ {MsgId, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId},
+ Remaining}
+ end, State1}
+ end.
+
+internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
+ [Obj =
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
+ next_seq_id = NextReadSeqId}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
+ [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ ok =
+ if FakeDeliver orelse Delivered -> ok;
+ true ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ if ReadMsg ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ NextReadSeqId, State1};
+ true ->
+ {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
+ end.
+
+internal_auto_ack(Q, State) ->
+ case internal_deliver(Q, false, true, State) of
+ {ok, empty, State1} -> {ok, State1};
+ {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} ->
+ remove_messages(Q, [MsgSeqId], true, State1)
+ end.
+
+internal_ack(Q, MsgSeqIds, State) ->
+ remove_messages(Q, MsgSeqIds, true, State).
+
+%% Q is only needed if MnesiaDelete /= false
+%% called from ack with MnesiaDelete = true
+%% called from tx_commit with MnesiaDelete = txn
+%% called from tx_cancel with MnesiaDelete = false
+%% called from purge with MnesiaDelete = txn
+%% called from delete_queue with MnesiaDelete = txn
+remove_messages(Q, MsgSeqIds, MnesiaDelete,
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
+ Files =
+ lists:foldl(
+ fun ({MsgId, SeqId}, Files2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ Files3 =
+ if 1 =:= RefCount ->
+ ok = dets_ets_delete(State, MsgId),
+ [{File, ValidTotalSize, ContiguousTop,
+ Left, Right}] = ets:lookup(FileSummary, File),
+ ContiguousTop1 =
+ lists:min([ContiguousTop, Offset]),
+ true =
+ ets:insert(FileSummary,
+ {File, (ValidTotalSize-TotalSize-
+ ?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
+ if CurName =:= File -> Files2;
+ true -> sets:add_element(File, Files2)
+ end;
+ 1 < RefCount ->
+ ok = dets_ets_insert(
+ State, {MsgId, RefCount - 1,
+ File, Offset, TotalSize}),
+ Files2
+ end,
+ ok = if MnesiaDelete ->
+ mnesia:dirty_delete(rabbit_disk_queue,
+ {Q, SeqId});
+ MnesiaDelete =:= txn ->
+ mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
+ true -> ok
+ end,
+ Files3
+ end, sets:new(), MsgSeqIds),
+ State2 = compact(Files, State),
+ {ok, State2}.
+
+internal_tx_publish(MsgId, MsgBody,
+ State = #dqstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ current_offset = CurOffset,
+ file_summary = FileSummary
+ }) ->
+ case dets_ets_lookup(State, MsgId) of
+ [] ->
+ %% New message, lots to do
+ {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
+ true = dets_ets_insert_new(State, {MsgId, 1, CurName,
+ CurOffset, TotalSize}),
+ [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
+ ets:lookup(FileSummary, CurName),
+ ValidTotalSize1 = ValidTotalSize + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
+ %% can't be any holes in this file
+ ValidTotalSize1;
+ true -> ContiguousTop
+ end,
+ true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
+ ContiguousTop1, Left, undefined}),
+ NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ maybe_roll_to_new_file(
+ NextOffset, State #dqstate {current_offset = NextOffset,
+ current_dirty = true});
+ [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ %% We already know about it, just update counter
+ ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
+ Offset, TotalSize}),
+ {ok, State}
+ end.
+
+%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
+internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
+ State = #dqstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ current_dirty = IsDirty,
+ sequences = Sequences
+ }) ->
+ {PubList, PubAcc, ReadSeqId, Length} =
+ case PubMsgSeqIds of
+ [] -> {[], undefined, undefined, undefined};
+ [{_, FirstSeqIdTo}|_] ->
+ {InitReadSeqId, InitWriteSeqId, InitLength} =
+ sequence_lookup(Sequences, Q),
+ InitReadSeqId2 = determine_next_read_id(
+ InitReadSeqId, InitWriteSeqId, FirstSeqIdTo),
+ { zip_with_tail(PubMsgSeqIds, {last, {next, next}}),
+ InitWriteSeqId, InitReadSeqId2, InitLength}
+ end,
+ {atomic, {Sync, WriteSeqId, State2}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ %% must deal with publishes first, if we didn't
+ %% then we could end up acking a message before
+ %% it's been published, which is clearly
+ %% nonsense. I.e. in commit, do not do things in an
+ %% order which _could_not_ have happened.
+ {Sync2, WriteSeqId3} =
+ lists:foldl(
+ fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
+ {Acc, ExpectedSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset,
+ _TotalSize}] = dets_ets_lookup(State, MsgId),
+ SeqId2 = adjust_last_msg_seq_id(
+ Q, ExpectedSeqId, SeqId, write),
+ NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId),
+ ok = mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id =
+ {Q, SeqId2},
+ msg_id = MsgId,
+ is_delivered = false,
+ next_seq_id = NextSeqId2
+ },
+ write),
+ {Acc orelse (CurName =:= File), NextSeqId2}
+ end, {false, PubAcc}, PubList),
+
+ {ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
+ {Sync2, WriteSeqId3, State3}
+ end),
+ true = if PubList =:= [] -> true;
+ true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId,
+ Length + erlang:length(PubList)})
+ end,
+ IsDirty2 = if IsDirty andalso Sync ->
+ ok = file:sync(CurHdl),
+ false;
+ true -> IsDirty
+ end,
+ {ok, State2 #dqstate { current_dirty = IsDirty2 }}.
+
+%% SeqId can be 'next'
+internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
+ {ok, State1 = #dqstate { sequences = Sequences }} =
+ internal_tx_publish(MsgId, MsgBody, State),
+ {ReadSeqId, WriteSeqId, Length} =
+ sequence_lookup(Sequences, Q),
+ ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId),
+ WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty),
+ WriteSeqId3Next = WriteSeqId3 + 1,
+ ok = mnesia:dirty_write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
+ msg_id = MsgId,
+ next_seq_id = WriteSeqId3Next,
+ is_delivered = IsDelivered}),
+ true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}),
+ {ok, {MsgId, WriteSeqId3}, State1}.
+
+internal_tx_cancel(MsgIds, State) ->
+ %% we don't need seq ids because we're not touching mnesia,
+ %% because seqids were never assigned
+ MsgSeqIds = zip_with_tail(MsgIds, {duplicate, undefined}),
+ remove_messages(undefined, MsgSeqIds, false, State).
+
+internal_requeue(_Q, [], State) ->
+ {ok, State};
+internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
+ State = #dqstate { sequences = Sequences }) ->
+ %% We know that every seq_id in here is less than the ReadSeqId
+ %% you'll get if you look up this queue in Sequences (i.e. they've
+ %% already been delivered). We also know that the rows for these
+ %% messages are still in rabbit_disk_queue (i.e. they've not been
+ %% ack'd).
+
+ %% Now, it would be nice if we could adjust the sequence ids in
+ %% rabbit_disk_queue (mnesia) to create a contiguous block and
+ %% then drop the ReadSeqId for the queue by the corresponding
+ %% amount. However, this is not safe because there may be other
+ %% sequence ids which have been sent out as part of deliveries
+ %% which are not being requeued. As such, moving things about in
+ %% rabbit_disk_queue _under_ the current ReadSeqId would result in
+ %% such sequence ids referring to the wrong messages.
+
+ %% Therefore, the only solution is to take these messages, and to
+ %% reenqueue them at the top of the queue. Usefully, this only
+ %% affects the Sequences and rabbit_disk_queue structures - there
+ %% is no need to physically move the messages about on disk, so
+ %% MsgLocation and FileSummary stay put (which makes further sense
+ %% as they have no concept of sequence id anyway).
+
+ {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
+ ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
+ MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}),
+ {atomic, {WriteSeqId2, Q}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q},
+ MsgSeqIdsZipped)
+ end),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2,
+ Length + erlang:length(MsgSeqIds)}),
+ {ok, State}.
+
+requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
+ {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}},
+ {ExpectedSeqIdTo, Q}) ->
+ SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
+ NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
+ [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId,
+ next_seq_id = NextSeqIdOrig }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
+ if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok;
+ true ->
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2},
+ next_seq_id = NextSeqIdTo2,
+ is_delivered = NewIsDelivered
+ },
+ write),
+ ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
+ end,
+ {NextSeqIdTo2, Q}.
+
+internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {ok, 0, State};
+ [{Q, ReadSeqId, WriteSeqId, _Length}] ->
+ {atomic, {ok, State2}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ {MsgSeqIds, WriteSeqId} =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId,
+ next_seq_id = NextSeqId }
+ ] = mnesia:read(rabbit_disk_queue,
+ {Q, SeqId}, write),
+ {true, {MsgId, SeqId}, NextSeqId}
+ end, ReadSeqId),
+ remove_messages(Q, MsgSeqIds, txn, State)
+ end),
+ true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}),
+ {ok, WriteSeqId - ReadSeqId, State2}
+ end.
+
+internal_delete_queue(Q, State) ->
+ {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State), %% remove everything undelivered
+ true = ets:delete(Sequences, Q),
+ {atomic, {ok, State2}} =
+ mnesia:transaction(
+ fun() -> %% now remove everything already delivered
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ Objs =
+ mnesia:match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ write),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
+ remove_messages(Q, MsgSeqIds, txn, State1)
+ end),
+ {ok, State2}.
+
+internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {[], State};
+ [{Q, ReadSeq, WriteSeq, _Length}] ->
+ {QList, {WriteSeq, State3}} =
+ rabbit_misc:unfold(
+ fun ({SeqId, _State1}) when SeqId == WriteSeq ->
+ false;
+ ({SeqId, State1}) ->
+ {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}},
+ NextReadSeqId, State2} =
+ internal_read_message(Q, SeqId, true, true,
+ State1),
+ {true,
+ {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId},
+ {NextReadSeqId, State2}}
+ end, {ReadSeq, State}),
+ {lists:reverse(QList), State3}
+ end.
+
+internal_delete_non_durable_queues(
+ DurableQueues, State = #dqstate { sequences = Sequences }) ->
+ ets:foldl(
+ fun ({Q, _Read, _Write, _Length}, {ok, State1}) ->
+ case sets:is_element(Q, DurableQueues) of
+ true -> {ok, State1};
+ false -> internal_delete_queue(Q, State1)
+ end
+ end, {ok, State}, Sequences).
+
+%% ---- ROLLING OVER THE APPEND FILE ----
+
+maybe_roll_to_new_file(Offset,
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ current_file_name = CurName,
+ current_file_handle = CurHdl,
+ current_file_num = CurNum,
+ current_dirty = IsDirty,
+ file_summary = FileSummary
+ }
+ ) when Offset >= FileSizeLimit ->
+ ok = if IsDirty -> file:sync(CurHdl);
+ true -> ok
+ end,
+ ok = file:close(CurHdl),
+ NextNum = CurNum + 1,
+ NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
+ {ok, NextHdl} = file:open(form_filename(NextName),
+ [write, raw, binary, delayed_write]),
+ ok = preallocate(NextHdl, FileSizeLimit, 0),
+ true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
+ true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
+ State1 = State #dqstate { current_file_name = NextName,
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0,
+ current_dirty = false
+ },
+ {ok, compact(sets:from_list([CurName]), State1)};
+maybe_roll_to_new_file(_, State) ->
+ {ok, State}.
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
+ ok = file:truncate(Hdl),
+ {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
+ ok.
+
+%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
+
+compact(FilesSet, State) ->
+ %% smallest number, hence eldest, hence left-most, first
+ Files = lists:sort(sets:to_list(FilesSet)),
+ %% foldl reverses, so now youngest/right-most first
+ RemainingFiles = lists:foldl(fun (File, Acc) ->
+ delete_empty_files(File, Acc, State)
+ end, [], Files),
+ lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
+
+combine_file(File, State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
+ %% the file we're looking at may no longer exist as it may have
+ %% been deleted within the current GC run
+ case ets:lookup(FileSummary, File) of
+ [] -> State;
+ [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] ->
+ GoRight =
+ fun() ->
+ case Right of
+ undefined -> State;
+ _ when not (CurName == Right) ->
+ [RightObj] = ets:lookup(FileSummary, Right),
+ {_, State1} =
+ adjust_meta_and_combine(FileObj, RightObj,
+ State),
+ State1;
+ _ -> State
+ end
+ end,
+ case Left of
+ undefined ->
+ GoRight();
+ _ -> [LeftObj] = ets:lookup(FileSummary, Left),
+ case adjust_meta_and_combine(LeftObj, FileObj, State) of
+ {true, State1} -> State1;
+ {false, State} -> GoRight()
+ end
+ end
+ end.
+
+adjust_meta_and_combine(
+ LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile},
+ RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight},
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ file_summary = FileSummary
+ }) ->
+ TotalValidData = LeftValidData + RightValidData,
+ if FileSizeLimit >= TotalValidData ->
+ State1 = combine_files(RightObj, LeftObj, State),
+ %% this could fail if RightRight is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary, RightRight, {4, LeftFile}),
+ true = ets:insert(FileSummary, {LeftFile,
+ TotalValidData, TotalValidData,
+ LeftLeft,
+ RightRight}),
+ true = ets:delete(FileSummary, RightFile),
+ {true, State1};
+ true -> {false, State}
+ end.
+
+sort_msg_locations_by_offset(Asc, List) ->
+ Comp = if Asc -> fun erlang:'<'/2;
+ true -> fun erlang:'>'/2
+ end,
+ lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ Comp(OffA, OffB)
+ end, List).
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
+ ok = file:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+combine_files({Source, SourceValid, _SourceContiguousTop,
+ _SourceLeft, _SourceRight},
+ {Destination, DestinationValid, DestinationContiguousTop,
+ _DestinationLeft, _DestinationRight},
+ State1) ->
+ State = close_file(Source, close_file(Destination, State1)),
+ {ok, SourceHdl} =
+ file:open(form_filename(Source),
+ [read, write, raw, binary, read_ahead, delayed_write]),
+ {ok, DestinationHdl} =
+ file:open(form_filename(Destination),
+ [read, write, raw, binary, read_ahead, delayed_write]),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize);
+ true ->
+ Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} =
+ file:open(form_filename(Tmp),
+ [read, write, raw, binary,
+ read_ahead, delayed_write]),
+ Worklist =
+ lists:dropwhile(
+ fun ({_, _, _, Offset, _})
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end, sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Destination,
+ '_', '_'}))),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and MsgLocationDets has been updated to
+ %% reflect compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file:position(TmpHdl, {bof, 0}),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file:sync(DestinationHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(form_filename(Tmp))
+ end,
+ SourceWorkList =
+ sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Source,
+ '_', '_'})),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file:sync(DestinationHdl),
+ ok = file:close(SourceHdl),
+ ok = file:close(DestinationHdl),
+ ok = file:delete(form_filename(Source)),
+ State.
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, State) ->
+ {FinalOffset, BlockStart2, BlockEnd2} =
+ lists:foldl(
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% update MsgLocationDets to reflect change of file and offset
+ ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next
+ %% msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the work for
+ %% the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(SourceHdl, {bof, BlockStart}),
+ {ok, BSize} =
+ file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {InitOffset, undefined, undefined}, WorkList),
+ %% do the last remaining block
+ BSize2 = BlockEnd2 - BlockStart2,
+ {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
+ {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2),
+ ok.
+
+close_file(File, State = #dqstate { read_file_handles =
+ {ReadHdls, ReadHdlsAge} }) ->
+ case dict:find(File, ReadHdls) of
+ error ->
+ State;
+ {ok, {Hdl, Then}} ->
+ ok = file:close(Hdl),
+ State #dqstate { read_file_handles =
+ { dict:erase(File, ReadHdls),
+ gb_trees:delete(Then, ReadHdlsAge) } }
+ end.
+
+delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
+ [{File, ValidData, _ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
+ case ValidData of
+ %% we should NEVER find the current file in here hence right
+ %% should always be a file, not undefined
+ 0 ->
+ case {Left, Right} of
+ {undefined, _} when not (is_atom(Right)) ->
+ %% the eldest file is empty. YAY!
+ %% left is the 4th field
+ true =
+ ets:update_element(FileSummary, Right, {4, undefined});
+ {_, _} when not (is_atom(Right)) ->
+ %% left is the 4th field
+ true = ets:update_element(FileSummary, Right, {4, Left}),
+ %% right is the 5th field
+ true = ets:update_element(FileSummary, Left, {5, Right})
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(File)),
+ Acc;
+ _ -> [File|Acc]
+ end.
+
+%% ---- DISK RECOVERY ----
+
+add_index() ->
+ case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end.
+
+del_index() ->
+ case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's probably
+ %% not the end of the world
+ {aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
+ E2 -> E2
+ end.
+
+load_from_disk(State) ->
+ %% sorted so that smallest number is first. which also means
+ %% eldest file (left-most) first
+ ok = add_index(),
+ {Files, TmpFiles} = get_disk_queue_files(),
+ ok = recover_crashed_compactions(Files, TmpFiles),
+ %% There should be no more tmp files now, so go ahead and load the
+ %% whole lot
+ State1 = load_messages(undefined, Files, State),
+ %% Finally, check there is nothing in mnesia which we haven't
+ %% loaded
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, SeqId} },
+ true) ->
+ case erlang:length(dets_ets_lookup(
+ State1, MsgId)) of
+ 0 -> ok == mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
+ 1 -> true
+ end
+ end,
+ true, rabbit_disk_queue)
+ end),
+ State2 = extract_sequence_numbers(State1),
+ ok = del_index(),
+ {ok, State2}.
+
+extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] -> true =
+ ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite, -1});
+ [Orig = {Q, Read, Write, Length}] ->
+ Repl = {Q, lists:min([Read, SeqId]),
+ %% Length is wrong here, but
+ %% it doesn't matter because
+ %% we'll pull out the gaps in
+ %% remove_gaps_in_sequences in
+ %% then do a straight
+ %% subtraction to get the
+ %% right length
+ lists:max([Write, NextWrite]), Length},
+ if Orig /= Repl ->
+ true = ets:insert(Sequences, Repl);
+ true -> true
+ end
+ end
+ end, true, rabbit_disk_queue)
+ end),
+ remove_gaps_in_sequences(State),
+ State.
+
+remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
+ %% read the comments at internal_requeue.
+
+ %% Because we are at startup, we know that no sequence ids have
+ %% been issued (or at least, they were, but have been
+ %% forgotten). Therefore, we can nicely shuffle up and not
+ %% worry. Note that I'm choosing to shuffle up, but alternatively
+ %% we could shuffle downwards. However, I think there's greater
+ %% likelihood of gaps being at the bottom rather than the top of
+ %% the queue, so shuffling up should be the better bet.
+ {atomic, _} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId, _Length}) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ ReadSeqId2 = ReadSeqId + Gap,
+ Length = WriteSeqId - ReadSeqId2,
+ true =
+ ets:insert(Sequences,
+ {Q, ReadSeqId2, WriteSeqId, Length})
+ end, ets:match_object(Sequences, '_'))
+ end).
+
+shuffle_up(_Q, SeqId, SeqId, Gap) ->
+ Gap;
+shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
+ GapInc =
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> 1;
+ [Obj] ->
+ if Gap =:= 0 -> ok;
+ true -> mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId + Gap },
+ next_seq_id = SeqId + Gap + 1
+ },
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
+ end,
+ 0
+ end,
+ shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
+
+load_messages(undefined, [],
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName }) ->
+ true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
+ State;
+load_messages(Left, [], State) ->
+ Num = list_to_integer(filename:rootname(Left)),
+ Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
+ [] -> 0;
+ L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
+ sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ end,
+ State #dqstate { current_file_num = Num, current_file_name = Left,
+ current_offset = Offset };
+load_messages(Left, [File|Files],
+ State = #dqstate { file_summary = FileSummary }) ->
+ %% [{MsgId, TotalSize, FileOffset}]
+ {ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
+ {ValidMessagesRev, ValidTotalSize} = lists:foldl(
+ fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id)) of
+ 0 -> {VMAcc, VTSAcc};
+ RefCount ->
+ true =
+ dets_ets_insert_new(State, {MsgId, RefCount, File,
+ Offset, TotalSize}),
+ {[{MsgId, TotalSize, Offset}|VMAcc],
+ VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ }
+ end
+ end, {[], 0}, Messages),
+ %% foldl reverses lists and find_contiguous_block_prefix needs
+ %% elems in the same order as from scan_file_for_valid_messages
+ {ContiguousTop, _} = find_contiguous_block_prefix(
+ lists:reverse(ValidMessagesRev)),
+ Right = case Files of
+ [] -> undefined;
+ [F|_] -> F
+ end,
+ true = ets:insert_new(FileSummary,
+ {File, ValidTotalSize, ContiguousTop, Left, Right}),
+ load_messages(File, Files, State).
+
+%% ---- DISK RECOVERY OF FAILED COMPACTION ----
+
+recover_crashed_compactions(Files, TmpFiles) ->
+ lists:foreach(fun (TmpFile) ->
+ ok = recover_crashed_compactions1(Files, TmpFile) end,
+ TmpFiles),
+ ok.
+
+verify_messages_in_mnesia(MsgIds) ->
+ lists:foreach(
+ fun (MsgId) ->
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id))
+ end, MsgIds).
+
+recover_crashed_compactions1(Files, TmpFile) ->
+ GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
+ NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
+ true = lists:member(NonTmpRelatedFile, Files),
+ %% [{MsgId, TotalSize, FileOffset}]
+ {ok, UncorruptedMessagesTmp} =
+ scan_file_for_valid_messages(form_filename(TmpFile)),
+ MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
+ %% all of these messages should appear in the mnesia table,
+ %% otherwise they wouldn't have been copied out
+ verify_messages_in_mnesia(MsgIdsTmp),
+ {ok, UncorruptedMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
+ %% 1) It's possible that everything in the tmp file is also in the
+ %% main file such that the main file is (prefix ++
+ %% tmpfile). This means that compaction failed immediately
+ %% prior to the final step of deleting the tmp file. Plan: just
+ %% delete the tmp file
+ %% 2) It's possible that everything in the tmp file is also in the
+ %% main file but with holes throughout (or just somthing like
+ %% main = (prefix ++ hole ++ tmpfile)). This means that
+ %% compaction wrote out the tmp file successfully and then
+ %% failed. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 3) It's possible that everything in the tmp file is also in the
+ %% main file but such that the main file does not end with tmp
+ %% file (and there are valid messages in the suffix; main =
+ %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
+ %% means that compaction failed as we were writing out the tmp
+ %% file. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 4) It's possible that there are messages in the tmp file which
+ %% are not in the main file. This means that writing out the
+ %% tmp file succeeded, but then we failed as we were copying
+ %% them back over to the main file, after truncating the main
+ %% file. As the main file has already been truncated, it should
+ %% consist only of valid messages. Plan: Truncate the main file
+ %% back to before any of the files in the tmp file and copy
+ %% them over again
+ case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
+ true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
+ %% note this also catches the case when the tmp file
+ %% is empty
+ ok = file:delete(TmpFile);
+ _False ->
+ %% we're in case 4 above. Check that everything in the
+ %% main file is a valid message in mnesia
+ verify_messages_in_mnesia(MsgIds),
+ %% The main file should be contiguous
+ {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
+ %% we should have that none of the messages in the prefix
+ %% are in the tmp file
+ true = lists:all(fun (MsgId) ->
+ not (lists:member(MsgId, MsgIdsTmp))
+ end, MsgIds),
+ {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
+ [write, raw, binary, delayed_write]),
+ {ok, Top} = file:position(MainHdl, Top),
+ %% wipe out any rubbish at the end of the file
+ ok = file:truncate(MainHdl),
+ %% there really could be rubbish at the end of the file -
+ %% we could have failed after the extending truncate.
+ %% Remember the head of the list will be the highest entry
+ %% in the file
+ [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
+ TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Top + TmpSize,
+ {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
+ %% and now extend the main file as big as necessary in a
+ %% single move if we run out of disk space, this truncate
+ %% could fail, but we still aren't risking losing data
+ ok = file:truncate(MainHdl),
+ {ok, TmpHdl} = file:open(form_filename(TmpFile),
+ [read, raw, binary, read_ahead]),
+ {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file:close(MainHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(TmpFile),
+
+ {ok, MainMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIdsMain = lists:map(GrabMsgId, MainMessages),
+ %% check that everything in MsgIds is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIds),
+ %% check that everything in MsgIdsTmp is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIdsTmp)
+ end,
+ ok.
+
+%% this assumes that the messages are ordered such that the highest
+%% address is at the head of the list. This matches what
+%% scan_file_for_valid_messages produces
+find_contiguous_block_prefix([]) -> {0, []};
+find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) ->
+ case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
+ {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ lists:reverse(Acc)};
+ Res -> Res
+ end.
+find_contiguous_block_prefix([], 0, Acc) ->
+ {ok, Acc};
+find_contiguous_block_prefix([], _N, _Acc) ->
+ {0, []};
+find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail],
+ ExpectedOffset, Acc)
+ when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT ->
+ find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
+find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
+ find_contiguous_block_prefix(List).
+
+file_name_sort(A, B) ->
+ ANum = list_to_integer(filename:rootname(A)),
+ BNum = list_to_integer(filename:rootname(B)),
+ ANum < BNum.
+
+get_disk_queue_files() ->
+ DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()),
+ DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles),
+ DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()),
+ DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles),
+ {DQFilesSorted, DQTFilesSorted}.
+
+%% ---- RAW READING AND WRITING OF FILES ----
+
+append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
+ BodySize = size(MsgBody),
+ MsgIdBin = term_to_binary(MsgId),
+ MsgIdBinSize = size(MsgIdBin),
+ TotalSize = BodySize + MsgIdBinSize,
+ case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdBin:MsgIdBinSize/binary,
+ MsgBody:BodySize/binary,
+ ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
+ ok -> {ok, TotalSize};
+ KO -> KO
+ end.
+
+read_message_at_offset(FileHdl, Offset, TotalSize) ->
+ TotalSizeWriteOkBytes = TotalSize + 1,
+ case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} ->
+ case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
+ {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ Rest:TotalSizeWriteOkBytes/binary>>} ->
+ BodySize = TotalSize - MsgIdBinSize,
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
+ {ok, {MsgBody, BodySize}};
+ KO -> KO
+ end;
+ KO -> KO
+ end.
+
+scan_file_for_valid_messages(File) ->
+ {ok, Hdl} = file:open(File, [raw, binary, read]),
+ Valid = scan_file_for_valid_messages(Hdl, 0, []),
+ %% if something really bad's happened, the close could fail, but ignore
+ file:close(Hdl),
+ Valid.
+
+scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
+ case read_next_file_entry(FileHdl, Offset) of
+ {ok, eof} -> {ok, Acc};
+ {ok, {corrupted, NextOffset}} ->
+ scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
+ {ok, {ok, MsgId, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(FileHdl, NextOffset,
+ [{MsgId, TotalSize, Offset}|Acc]);
+ _KO ->
+ %% bad message, but we may still have recovered some valid messages
+ {ok, Acc}
+ end.
+
+
+read_next_file_entry(FileHdl, Offset) ->
+ TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
+ case file:read(FileHdl, TwoIntegers) of
+ {ok,
+ <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
+ {true, _} -> {ok, eof}; %% Nothing we can do other than stop
+ {false, true} ->
+ %% current message corrupted, try skipping past it
+ ExpectedAbsPos =
+ Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
+ case file:position(FileHdl, {cur, TotalSize + 1}) of
+ {ok, ExpectedAbsPos} ->
+ {ok, {corrupted, ExpectedAbsPos}};
+ {ok, _SomeOtherPos} ->
+ {ok, eof}; %% seek failed, so give up
+ KO -> KO
+ end;
+ {false, false} -> %% all good, let's continue
+ case file:read(FileHdl, MsgIdBinSize) of
+ {ok, <<MsgId:MsgIdBinSize/binary>>} ->
+ ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
+ case file:position(FileHdl,
+ {cur, TotalSize - MsgIdBinSize}
+ ) of
+ {ok, ExpectedAbsPos} ->
+ NextOffset = Offset + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ case file:read(FileHdl, 1) of
+ {ok,
+ <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
+ {ok, {ok, binary_to_term(MsgId),
+ TotalSize, NextOffset}};
+ {ok, _SomeOtherData} ->
+ {ok, {corrupted, NextOffset}};
+ KO -> KO
+ end;
+ {ok, _SomeOtherPos} ->
+ %% seek failed, so give up
+ {ok, eof};
+ KO -> KO
+ end;
+ eof -> {ok, eof};
+ KO -> KO
+ end
+ end;
+ eof -> {ok, eof};
+ KO -> KO
+ end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2be00503..fe5acc83 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
+-define(SERIAL_FILENAME, rabbit_guid).
-record(state, {serial}).
@@ -59,17 +60,24 @@
%%----------------------------------------------------------------------------
start_link() ->
- %% The persister can get heavily loaded, and we don't want that to
- %% impact guid generation. We therefore keep the serial in a
- %% separate process rather than calling rabbit_persister:serial/0
- %% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [rabbit_persister:serial()], []).
+ [update_disk_serial()], []).
+
+update_disk_serial() ->
+ Filename = filename:join(mnesia:system_info(directory), ?SERIAL_FILENAME),
+ Serial = case file:read_file(Filename) of
+ {ok, Content} ->
+ binary_to_term(Content);
+ {error, _} ->
+ 0
+ end,
+ ok = file:write_file(Filename, term_to_binary(Serial + 1)),
+ Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
-%% persistent message store hasn't been deleted.
+%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@@ -77,7 +85,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 72e16f0f..c965c693 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -52,6 +52,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2]).
-import(mnesia).
-import(lists).
@@ -113,6 +114,7 @@
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> [B]).
-endif.
@@ -431,3 +433,11 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> {Acc, Init}
+ end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
new file mode 100644
index 00000000..a950584a
--- /dev/null
+++ b/src/rabbit_mixed_queue.erl
@@ -0,0 +1,394 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_mixed_queue).
+
+-include("rabbit.hrl").
+
+-export([start_link/3]).
+
+-export([publish/2, publish_delivered/2, deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
+ length/1, is_empty/1, delete_queue/1]).
+
+-export([to_disk_only_mode/1, to_mixed_mode/1]).
+
+-record(mqstate, { mode,
+ msg_buf,
+ next_write_seq,
+ queue,
+ is_durable
+ }
+ ).
+
+start_link(Queue, IsDurable, disk) ->
+ purge_non_persistent_messages(
+ #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
+ next_write_seq = 0, is_durable = IsDurable });
+start_link(Queue, IsDurable, mixed) ->
+ {ok, State} = start_link(Queue, IsDurable, disk),
+ to_mixed_mode(State).
+
+to_disk_only_mode(State = #mqstate { mode = disk }) ->
+ {ok, State};
+to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ next_write_seq = NextSeq }) ->
+ rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
+ %% We enqueue _everything_ here. This means that should a message
+ %% already be in the disk queue we must remove it and add it back
+ %% in. Fortunately, by using requeue, we avoid rewriting the
+ %% message on disk.
+ %% Note we also batch together messages on disk so that we minimise
+ %% the calls to requeue.
+ Msgs = queue:to_list(MsgBuf),
+ {NextSeq1, Requeue} =
+ lists:foldl(
+ fun ({_Seq, Msg = #basic_message { guid = MsgId },
+ IsDelivered, OnDisk}, {NSeq, RQueueAcc}) ->
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ {NSeq + 1,
+ [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]};
+ true ->
+ ok = if [] == RQueueAcc -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(RQueueAcc))
+ end,
+ ok = rabbit_disk_queue:publish_with_seq(
+ Q, MsgId, NSeq, msg_to_bin(Msg), false),
+ {NSeq + 1, []}
+ end
+ end, {NextSeq, []}, Msgs),
+ ok = if [] == Requeue -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ end,
+ {ok, State #mqstate { mode = disk, msg_buf = queue:new(),
+ next_write_seq = NextSeq1 }}.
+
+to_mixed_mode(State = #mqstate { mode = mixed }) ->
+ {ok, State};
+to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) ->
+ rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
+ %% load up a new queue with everything that's on disk.
+ %% don't remove non-persistent messages that happen to be on disk
+ QList = rabbit_disk_queue:dump_queue(Q),
+ {MsgBuf1, NextSeq1} =
+ lists:foldl(
+ fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq})
+ when SeqId >= NSeq ->
+ Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
+ {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId + 1}
+ end, {queue:new(), 0}, QList),
+ State1 = State #mqstate { mode = mixed, msg_buf = MsgBuf1,
+ next_write_seq = NextSeq1 },
+ rabbit_log:info("Queue length: ~p ~w ~w~n",
+ [Q, rabbit_mixed_queue:length(State),
+ rabbit_mixed_queue:length(State1)]),
+ {ok, State1}.
+
+purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% iterate through the content on disk, ack anything which isn't
+ %% persistent, accumulate everything else that is persistent and
+ %% requeue it
+ NextSeq = rabbit_disk_queue:next_write_seq(Q),
+ {Acks, Requeue, NextSeq2} =
+ deliver_all_messages(Q, IsDurable, [], [], NextSeq),
+ ok = if Requeue == [] -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ end,
+ ok = if Acks == [] -> ok;
+ true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
+ end,
+ {ok, State #mqstate { next_write_seq = NextSeq2 }}.
+
+deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {Acks, Requeue, NextSeq};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ bin_to_msg(MsgBin),
+ OnDisk = IsPersistent andalso IsDurable,
+ {Acks2, Requeue2, NextSeq2} =
+ if OnDisk -> {Acks,
+ [{AckTag, {NextSeq, IsDelivered}} | Requeue],
+ NextSeq + 1
+ };
+ true -> {[AckTag | Acks], Requeue, NextSeq}
+ end,
+ deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2)
+ end.
+
+msg_to_bin(Msg = #basic_message { content = Content }) ->
+ ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
+ term_to_binary(Msg #basic_message { content = ClearedContent }).
+
+bin_to_msg(MsgBin) ->
+ binary_to_term(MsgBin).
+
+publish(Msg = #basic_message { guid = MsgId },
+ State = #mqstate { mode = disk, queue = Q }) ->
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ {ok, State};
+publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+ State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
+ next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
+ OnDisk = IsDurable andalso IsPersistent,
+ ok = if OnDisk ->
+ rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq,
+ msg_to_bin(Msg), false);
+ true -> ok
+ end,
+ {ok, State #mqstate { next_write_seq = NextSeq + 1,
+ msg_buf = queue:in({NextSeq, Msg, false, OnDisk},
+ MsgBuf)
+ }}.
+
+%% Assumption here is that the queue is empty already (only called via
+%% attempt_immediate_delivery). Also note that the seq id assigned by
+%% the disk queue could well not be the same as the NextSeq (true =
+%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't
+%% matter because the AckTag will still be correct (AckTags for
+%% non-persistent messages don't exist). (next_write_seq is actually
+%% only used to calculate how many messages are in the queue).
+publish_delivered(Msg =
+ #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ State = #mqstate { mode = Mode, is_durable = IsDurable,
+ next_write_seq = NextSeq, queue = Q })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
+ rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ if IsDurable andalso IsPersistent ->
+ %% must call phantom_deliver otherwise the msg remains at
+ %% the head of the queue. This is synchronous, but
+ %% unavoidable as we need the AckTag
+ {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {ok, AckTag, State};
+ true ->
+ %% in this case, we don't actually care about the ack, so
+ %% auto ack it (asynchronously).
+ ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ {ok, noack, State #mqstate { next_write_seq = NextSeq + 1 }}
+ end;
+publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+ true = queue:is_empty(MsgBuf),
+ {ok, noack, State}.
+
+deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {empty, State};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ Msg = bin_to_msg(MsgBin),
+ AckTag2 = if IsPersistent andalso IsDurable -> AckTag;
+ true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ noack
+ end,
+ {{Msg, IsDelivered, AckTag2, Remaining}, State}
+ end;
+
+deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
+ next_write_seq = NextWrite, msg_buf = MsgBuf }) ->
+ {Result, MsgBuf2} = queue:out(MsgBuf),
+ case Result of
+ empty ->
+ {empty, State};
+ {value, {Seq, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ IsDelivered, OnDisk}} ->
+ AckTag =
+ if OnDisk ->
+ if IsPersistent andalso IsDurable ->
+ {MsgId, IsDelivered, AckTag2, _PersistRem} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ AckTag2;
+ true ->
+ ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ noack
+ end;
+ true -> noack
+ end,
+ {{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)},
+ State #mqstate { msg_buf = MsgBuf2 }}
+ end.
+
+remove_noacks(Acks) ->
+ lists:filter(fun (A) -> A /= noack end, Acks).
+
+ack(Acks, State = #mqstate { queue = Q }) ->
+ case remove_noacks(Acks) of
+ [] -> {ok, State};
+ AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags),
+ {ok, State}
+ end.
+
+tx_publish(Msg = #basic_message { guid = MsgId },
+ State = #mqstate { mode = disk }) ->
+ ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+ {ok, State};
+tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+ State = #mqstate { mode = mixed, is_durable = IsDurable })
+ when IsDurable andalso IsPersistent ->
+ ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+ {ok, State};
+tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
+ %% this message will reappear in the tx_commit, so ignore for now
+ {ok, State}.
+
+only_msg_ids(Pubs) ->
+ lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
+
+tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
+ {ok, State};
+tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
+ msg_buf = MsgBuf,
+ next_write_seq = NextSeq,
+ is_durable = IsDurable
+ }) ->
+ {PersistentPubs, MsgBuf2, NextSeq2} =
+ lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
+ {Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsPersistent andalso IsDurable,
+ Acc2 =
+ if OnDisk ->
+ [{Msg #basic_message.guid, NextSeq3}
+ | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk},
+ MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, Publishes),
+ %% foldl reverses, so re-reverse PersistentPubs to match
+ %% requirements of rabbit_disk_queue (ascending SeqIds)
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
+ true ->
+ rabbit_disk_queue:tx_commit_with_seqs(
+ Q, lists:reverse(PersistentPubs), RealAcks)
+ end,
+ {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
+
+only_persistent_msg_ids(Pubs) ->
+ lists:reverse(
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ if IsPersistent -> [Msg #basic_message.guid | Acc];
+ true -> Acc
+ end
+ end, [], Pubs)).
+
+tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
+ ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
+ {ok, State};
+tx_cancel(Publishes,
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ ok =
+ if IsDurable ->
+ rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
+ true -> ok
+ end,
+ {ok, State}.
+
+%% [{Msg, AckTag}]
+requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% here, we may have messages with no ack tags, because of the
+ %% fact they are not persistent, but nevertheless we want to
+ %% requeue them. This means publishing them delivered.
+ Requeue
+ = lists:foldl(
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ when IsPersistent andalso IsDurable ->
+ [AckTag | RQ];
+ ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ ok = if RQ == [] -> ok;
+ true -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
+ end,
+ _AckTag2 = rabbit_disk_queue:publish(
+ Q, MsgId, msg_to_bin(Msg), true),
+ []
+ end, [], MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
+ {ok, State};
+requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
+ msg_buf = MsgBuf,
+ next_write_seq = NextSeq,
+ is_durable = IsDurable
+ }) ->
+ {PersistentPubs, MsgBuf2, NextSeq2} =
+ lists:foldl(
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ {Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsDurable andalso IsPersistent,
+ Acc2 =
+ if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
+ ok = if [] == PersistentPubs -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(PersistentPubs))
+ end,
+ {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
+
+purge(State = #mqstate { queue = Q, mode = disk }) ->
+ Count = rabbit_disk_queue:purge(Q),
+ {Count, State};
+purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) ->
+ rabbit_disk_queue:purge(Q),
+ Count = queue:len(MsgBuf),
+ {Count, State #mqstate { msg_buf = queue:new() }}.
+
+delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
+ rabbit_disk_queue:delete_queue(Q),
+ {ok, State};
+delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
+ rabbit_disk_queue:delete_queue(Q),
+ {ok, State #mqstate { msg_buf = queue:new() }}.
+
+length(#mqstate { queue = Q, mode = disk }) ->
+ rabbit_disk_queue:length(Q);
+length(#mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+ queue:len(MsgBuf).
+
+is_empty(State) ->
+ 0 == rabbit_mixed_queue:length(State).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 575ecb0a..77e309fe 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -144,11 +144,26 @@ table_definitions() ->
{disc_copies, [node()]}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)}]},
+ {rabbit_disk_queue,
+ [{record_name, dq_msg_loc},
+ {type, set},
+ {local_content, true},
+ {attributes, record_info(fields, dq_msg_loc)},
+ {disc_only_copies, [node()]}]}
+ ].
+
+replicated_table_definitions() ->
+ [{Tab, Attrs} || {Tab, Attrs} <- table_definitions(),
+ not lists:member({local_content, true}, Attrs)
+ ].
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+replicated_table_names() ->
+ [Tab || {Tab, _} <- replicated_table_definitions()].
+
dir() -> mnesia:system_info(directory).
ensure_mnesia_dir() ->
@@ -173,7 +188,7 @@ ensure_mnesia_not_running() ->
check_schema_integrity() ->
%%TODO: more thorough checks
- case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of
+ case catch [mnesia:table_info(Tab, version) || Tab <- replicated_table_names()] of
{'EXIT', Reason} -> {error, Reason};
_ -> ok
end.
@@ -253,9 +268,10 @@ init_db(ClusterNodes) ->
WasDiskNode = mnesia:system_info(use_dir),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
- case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
+ ExtraNodes = ClusterNodes -- [node()],
+ case mnesia:change_config(extra_db_nodes, ExtraNodes) of
{ok, []} ->
- if WasDiskNode and IsDiskNode ->
+ if WasDiskNode ->
case check_schema_integrity() of
ok ->
ok;
@@ -270,14 +286,8 @@ init_db(ClusterNodes) ->
ok = move_db(),
ok = create_schema()
end;
- WasDiskNode ->
- throw({error, {cannot_convert_disk_node_to_ram_node,
- ClusterNodes}});
- IsDiskNode ->
- ok = create_schema();
true ->
- throw({error, {unable_to_contact_cluster_nodes,
- ClusterNodes}})
+ ok = create_schema()
end;
{ok, [_|_]} ->
ok = wait_for_tables(),
@@ -337,15 +347,19 @@ create_tables() ->
ok.
create_local_table_copies(Type) ->
- ok = if Type /= ram -> create_local_table_copy(schema, disc_copies);
- true -> ok
- end,
+ ok = create_local_table_copy(schema, disc_copies),
lists:foreach(
fun({Tab, TabDef}) ->
HasDiscCopies =
- lists:keymember(disc_copies, 1, TabDef),
+ case lists:keysearch(disc_copies, 1, TabDef) of
+ false -> false;
+ {value, {disc_copies, List1}} -> lists:member(node(), List1)
+ end,
HasDiscOnlyCopies =
- lists:keymember(disc_only_copies, 1, TabDef),
+ case lists:keysearch(disc_only_copies, 1, TabDef) of
+ false -> false;
+ {value, {disc_only_copies, List2}} -> lists:member(node(), List2)
+ end,
StorageType =
case Type of
disc ->
@@ -367,9 +381,6 @@ create_local_table_copies(Type) ->
ok = create_local_table_copy(Tab, StorageType)
end,
table_definitions()),
- ok = if Type == ram -> create_local_table_copy(schema, ram_copies);
- true -> ok
- end,
ok.
create_local_table_copy(Tab, Type) ->
@@ -387,7 +398,7 @@ create_local_table_copy(Tab, Type) ->
wait_for_tables() ->
case check_schema_integrity() of
ok ->
- case mnesia:wait_for_tables(table_names(), 30000) of
+ case mnesia:wait_for_tables(replicated_table_names(), 30000) of
ok -> ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
deleted file mode 100644
index d0d60ddf..00000000
--- a/src/rabbit_persister.erl
+++ /dev/null
@@ -1,523 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_persister).
-
--behaviour(gen_server).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([transaction/1, extend_transaction/2, dirty_work/1,
- commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
-
--include("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
--define(LOG_BUNDLE_DELAY, 5).
--define(COMPLETE_BUNDLE_DELAY, 2).
-
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
-
--record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
-
-%% two tables for efficient persistency
-%% one maps a key to a message
-%% the other maps a key to one or more queues.
-%% The aim is to reduce the overload of storing a message multiple times
-%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(qmsg() :: {amqqueue(), pkey()}).
--type(work_item() ::
- {publish, message(), qmsg()} |
- {deliver, qmsg()} |
- {ack, qmsg()}).
-
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
--spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
--spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-transaction(MessageList) ->
- ?LOGDEBUG("transaction ~p~n", [MessageList]),
- TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
-
-extend_transaction(TxnKey, MessageList) ->
- ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
- gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}).
-
-dirty_work(MessageList) ->
- ?LOGDEBUG("dirty_work ~p~n", [MessageList]),
- gen_server:cast(?SERVER, {dirty_work, MessageList}).
-
-commit_transaction(TxnKey) ->
- ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
-
-rollback_transaction(TxnKey) ->
- ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
- gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
-
-force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot, infinity).
-
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
-%%--------------------------------------------------------------------
-
-init(_Args) ->
- process_flag(trap_exit, true),
- FileName = base_filename(),
- ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
- messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
- LogHandle =
- case disk_log:open([{name, rabbit_persister},
- {head, current_snapshot(Snapshot)},
- {file, FileName}]) of
- {ok, LH} -> LH;
- {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
- WarningFun = if
- Bad > 0 -> fun rabbit_log:warning/2;
- true -> fun rabbit_log:info/2
- end,
- WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
- [Recovered, Bad]),
- LH
- end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
- case Res of
- ok ->
- ok = take_snapshot(LogHandle, NewSnapshot);
- {error, Reason} ->
- rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
- ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
- end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
- {ok, State}.
-
-handle_call({transaction, Key, MessageList}, From, State) ->
- NewState = internal_extend(Key, MessageList, State),
- do_noreply(internal_commit(From, Key, NewState));
-handle_call({commit_transaction, TxnKey}, From, State) ->
- do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State) ->
- do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({rollback_transaction, TxnKey}, State) ->
- do_noreply(internal_rollback(TxnKey, State));
-handle_cast({dirty_work, MessageList}, State) ->
- do_noreply(internal_dirty_work(MessageList, State));
-handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
- do_noreply(internal_extend(TxnKey, MessageList, State));
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(timeout, State = #pstate{deadline = infinity}) ->
- State1 = flush(true, State),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State1, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
-handle_info(timeout, State) ->
- do_noreply(flush(State));
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State = #pstate{log_handle = LogHandle}) ->
- flush(State),
- disk_log:close(LogHandle),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, flush(State)}.
-
-%%--------------------------------------------------------------------
-
-internal_extend(Key, MessageList, State) ->
- log_work(fun (ML) -> {extend_transaction, Key, ML} end,
- MessageList, State).
-
-internal_dirty_work(MessageList, State) ->
- log_work(fun (ML) -> {dirty_work, ML} end,
- MessageList, State).
-
-internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {commit_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
-
-internal_rollback(Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {rollback_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-complete(From, Item, State = #pstate{deadline = ExistingDeadline,
- pending_logs = Logs,
- pending_replies = Waiting}) ->
- State#pstate{deadline = compute_deadline(
- ?COMPLETE_BUNDLE_DELAY, ExistingDeadline),
- pending_logs = [Item | Logs],
- pending_replies = [From | Waiting]}.
-
-%% This is made to limit disk usage by writing messages only once onto
-%% disk. We keep a table associating pkeys to messages, and provided
-%% the list of messages to output is left to right, we can guarantee
-%% that pkeys will be a backreference to a message in memory when a
-%% "tied" is met.
-log_work(CreateWorkUnit, MessageList,
- State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
- Unit = CreateWorkUnit(
- rabbit_misc:map_in_order(
- fun(M = {publish, Message, QK = {_QName, PKey}}) ->
- case ets:lookup(Messages, PKey) of
- [_] -> {tied, QK};
- [] -> ets:insert(Messages, {PKey, Message}),
- M
- end;
- (M) -> M
- end,
- MessageList)),
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
- Message) ->
- State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
- ExistingDeadline),
- pending_logs = [Message | Logs]}.
-
-base_filename() ->
- rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
-
-take_snapshot(LogHandle, OldFileName, Snapshot) ->
- ok = disk_log:sync(LogHandle),
- %% current_snapshot is the Head (ie. first thing logged)
- ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)).
-
-take_snapshot(LogHandle, Snapshot) ->
- OldFileName = lists:flatten(base_filename() ++ ".previous"),
- file:delete(OldFileName),
- rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-take_snapshot_and_save_old(LogHandle, Snapshot) ->
- {MegaSecs, Secs, MicroSecs} = erlang:now(),
- Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs,
- OldFileName = lists:flatten(io_lib:format("~s.saved.~p",
- [base_filename(), Timestamp])),
- rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
- log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
-
-later_ms(DeltaMilliSec) ->
- {MegaSec, Sec, MicroSec} = now(),
- %% Note: not normalised. Unimportant for this application.
- {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}.
-
-%% Result = B - A, more or less
-time_diff({B1, B2, B3}, {A1, A2, A3}) ->
- (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 .
-
-compute_deadline(TimerDelay, infinity) ->
- later_ms(TimerDelay);
-compute_deadline(_TimerDelay, ExistingDeadline) ->
- ExistingDeadline.
-
-compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
-compute_timeout(Deadline) ->
- DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
- if
- DeltaMilliSec =< 1 ->
- 0;
- true ->
- round(DeltaMilliSec)
- end.
-
-do_noreply(State = #pstate{deadline = Deadline}) ->
- {noreply, State, compute_timeout(Deadline)}.
-
-do_reply(Reply, State = #pstate{deadline = Deadline}) ->
- {reply, Reply, State, compute_timeout(Deadline)}.
-
-flush(State) -> flush(false, State).
-
-flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if PendingLogs /= [] ->
- disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- State#pstate{entry_count = State#pstate.entry_count + 1};
- true ->
- State
- end,
- State2 = maybe_take_snapshot(ForceSnapshot, State1),
- if Waiting /= [] ->
- ok = disk_log:sync(LogHandle),
- lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
- Waiting);
- true ->
- ok
- end,
- State2#pstate{deadline = infinity,
- pending_logs = [],
- pending_replies = []}.
-
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
- %% Avoid infinite growth of the table by removing messages not
- %% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
- {messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
- ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
- {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- term_to_binary(InnerSnapshot)}.
-
-prune_table(Tab, Keys) ->
- true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
- true = ets:safe_fixtable(Tab, false).
-
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
- true -> ok;
- false -> ets:delete(Tab, Key)
- end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
-
-internal_load_snapshot(LogHandle,
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
- case check_version(Loaded_Snapshot) of
- {ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
- true = ets:insert(Messages, Ms),
- true = ets:insert(Queues, Qs),
- Snapshot1 = replay(Items, LogHandle, K,
- Snapshot#psnapshot{
- serial = Serial,
- transactions = Ts}),
- Snapshot2 = requeue_messages(Snapshot1),
- %% uncompleted transactions are discarded - this is TRTTD
- %% since we only get into this code on node restart, so
- %% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
- {error, Reason} -> {{error, Reason}, Snapshot}
- end.
-
-check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- StateBin}) ->
- {ok, StateBin};
-check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
- {error, {unsupported_persister_log_format, Vsn}};
-check_version(_Other) ->
- {error, unrecognised_persister_log_format}.
-
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
- Requeue = {PKey, Delivered},
- dict:update(QName,
- fun (Requeues) -> [Requeue | Requeues] end,
- [Requeue],
- Acc).
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
-replay([], LogHandle, K, Snapshot) ->
- case disk_log:chunk(LogHandle, K) of
- {K1, Items} ->
- replay(Items, LogHandle, K1, Snapshot);
- {K1, Items, Badbytes} ->
- rabbit_log:warning("~p bad bytes recovering persister log~n",
- [Badbytes]),
- replay(Items, LogHandle, K1, Snapshot);
- eof -> Snapshot
- end;
-replay([Item | Items], LogHandle, K, Snapshot) ->
- NewSnapshot = internal_integrate_messages(Item, Snapshot),
- replay(Items, LogHandle, K, NewSnapshot).
-
-internal_integrate_messages(Items, Snapshot) ->
- lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
- Snapshot, Items).
-
-internal_integrate1({extend_transaction, Key, MessageList},
- Snapshot = #psnapshot {transactions = Transactions}) ->
- NewTransactions =
- dict:update(Key,
- fun (MessageLists) -> [MessageList | MessageLists] end,
- [MessageList],
- Transactions),
- Snapshot#psnapshot{transactions = NewTransactions};
-internal_integrate1({rollback_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
-internal_integrate1({commit_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
- case dict:find(Key, Transactions) of
- {ok, MessageLists} ->
- ?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
- error ->
- Snapshot
- end;
-internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
new file mode 100644
index 00000000..080607bb
--- /dev/null
+++ b/src/rabbit_queue_mode_manager.erl
@@ -0,0 +1,105 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_mode_manager).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/1, change_memory_usage/2]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, { mode,
+ queues
+ }).
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register(Pid) ->
+ gen_server2:call(?SERVER, {register, Pid}).
+
+change_memory_usage(_Pid, Conserve) ->
+ gen_server2:cast(?SERVER, {change_memory_usage, Conserve}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ok = rabbit_alarm:register(self(), {?MODULE, change_memory_usage, []}),
+ {ok, #state { mode = unlimited,
+ queues = []
+ }}.
+
+handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) ->
+ Result = case Mode of
+ unlimited -> mixed;
+ _ -> disk
+ end,
+ {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
+
+handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) ->
+ constrain_queues(true, State #state.queues),
+ {noreply, State #state { mode = disk_only }};
+handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) ->
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ {noreply, State #state { mode = ram_disk }};
+
+handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) ->
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ {noreply, State #state { mode = unlimited }};
+handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) ->
+ constrain_queues(false, State #state.queues),
+ {noreply, State #state { mode = ram_disk }}.
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+constrain_queues(Constrain, Qs) ->
+ lists:foreach(
+ fun (QPid) ->
+ ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
+ end, Qs).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8f0a3a89..a2a31a18 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -31,7 +31,9 @@
-module(rabbit_tests).
--export([all_tests/0, test_parsing/0]).
+-compile(export_all).
+
+-export([all_tests/0, test_parsing/0, test_disk_queue/0]).
-import(lists).
@@ -54,6 +56,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_disk_queue(),
passed.
test_priority_queue() ->
@@ -261,7 +264,7 @@ test_log_management() ->
%% original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
{error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
%% logging directed to tty (handlers were removed in last test)
ok = clean_logs([MainLog, SaslLog], Suffix),
@@ -280,7 +283,7 @@ test_log_management() ->
ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
- {rabbit_sasl_report_file_h, SaslLog}]),
+ {rabbit_sasl_report_file_h, SaslLog}]),
passed.
test_log_management_during_startup() ->
@@ -404,19 +407,17 @@ test_cluster_management() ->
end,
ClusteringSequence),
- %% attempt to convert a disk node into a ram node
+ %% convert a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
- %% attempt to join a non-existing cluster as a ram node
+ %% join a non-existing cluster as a ram node
ok = control_action(reset, []),
- {error, {unable_to_contact_cluster_nodes, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
SecondaryNode = rabbit_misc:localnode(hare),
case net_adm:ping(SecondaryNode) of
@@ -432,11 +433,12 @@ test_cluster_management2(SecondaryNode) ->
NodeS = atom_to_list(node()),
SecondaryNodeS = atom_to_list(SecondaryNode),
- %% attempt to convert a disk node into a ram node
+ %% make a disk node
ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
- {error, {unable_to_join_cluster, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
+ %% make a ram node
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
%% join cluster as a ram node
ok = control_action(reset, []),
@@ -449,21 +451,21 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to join non-existing cluster as a ram node
- {error, _} = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
-
+ %% join non-existing cluster as a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn ram node into disk node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to convert a disk node into a ram node
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ %% convert a disk node into a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn a disk node into a ram node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -684,3 +686,344 @@ delete_log_handlers(Handlers) ->
[[] = error_logger:delete_report_handler(Handler) ||
Handler <- Handlers],
ok.
+
+test_disk_queue() ->
+ rdq_stop(),
+ rdq_virgin(),
+ passed = rdq_stress_gc(10000),
+ passed = rdq_test_startup_with_queue_gaps(),
+ passed = rdq_test_redeliver(),
+ passed = rdq_test_purge(),
+ passed = rdq_test_dump_queue(),
+ passed = rdq_test_mixed_queue_modes(),
+ rdq_virgin(),
+ ok = control_action(stop_app, []),
+ ok = control_action(start_app, []),
+ passed.
+
+benchmark_disk_queue() ->
+ rdq_stop(),
+ % unicode chars are supported properly from r13 onwards
+ io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
+ [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
+ timer:sleep(1000) end || % 1000 milliseconds
+ MsgSize <- [512, 8192, 32768, 131072],
+ Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)],
+ MsgCount <- [1024, 4096, 16384]
+ ],
+ rdq_virgin(),
+ ok = control_action(stop_app, []),
+ ok = control_action(start_app, []),
+ passed.
+
+rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
+ Startup = rdq_virgin(),
+ rdq_start(),
+ QCount = length(Qs),
+ Msg = <<0:(8*MsgSizeBytes)>>,
+ List = lists:seq(1, MsgCount),
+ {Publish, ok} =
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
+ || N <- List, _ <- Qs] end,
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
+ || Q <- Qs] end
+ ]]),
+ {Deliver, ok} =
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [begin SeqIds =
+ [begin
+ Remaining = MsgCount - N,
+ {N, Msg, MsgSizeBytes, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(Q),
+ SeqId
+ end || N <- List],
+ ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
+ end || Q <- Qs]
+ end]]),
+ io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
+ [MsgCount, MsgSizeBytes, QCount, float(Startup),
+ float(Publish), (Publish / (MsgCount * QCount)),
+ (Publish / (MsgCount * QCount * MsgSizeBytes)),
+ float(Deliver), (Deliver / (MsgCount * QCount)),
+ (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
+ rdq_stop().
+
+% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have
+% several files, and then keep punching holes in a reasonably sensible way.
+rdq_stress_gc(MsgCount) ->
+ rdq_virgin(),
+ rdq_start(),
+ MsgSizeBytes = 256*1024,
+ Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
+ List = lists:seq(1, MsgCount),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
+ rabbit_disk_queue:tx_commit(q, List, []),
+ StartChunk = round(MsgCount / 20), % 5%
+ AckList =
+ lists:foldl(
+ fun (E, Acc) ->
+ case lists:member(E, Acc) of
+ true -> Acc;
+ false -> [E|Acc]
+ end
+ end, [], lists:flatten(
+ lists:reverse(
+ [ lists:seq(N, MsgCount, N)
+ || N <- lists:seq(1, round(MsgCount / 2), 1)
+ ]))),
+ {Start, End} = lists:split(StartChunk, AckList),
+ AckList2 = End ++ Start,
+ MsgIdToSeqDict =
+ lists:foldl(
+ fun (MsgId, Acc) ->
+ Remaining = MsgCount - MsgId,
+ {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
+ dict:store(MsgId, SeqId, Acc)
+ end, dict:new(), List),
+ %% we really do want to ack each of this individually
+ [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
+ rabbit_disk_queue:ack(q, [SeqId])
+ end || MsgId <- AckList2],
+ rabbit_disk_queue:tx_commit(q, [], []),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_startup_with_queue_gaps() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% ack every other message we have delivered (starting at the _first_)
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ false;
+ (_SeqId2, false) ->
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Acked every other message delivered done~n", []),
+ rdq_stop(),
+ rdq_start(),
+ io:format("Startup (with shuffle) done~n", []),
+ %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered
+ Seqs2 = [begin
+ Remaining = round(Total - ((Half + N)/2)),
+ {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(2,Half,2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ io:format("Reread non-acked messages done~n", []),
+ %% and now fetch the rest
+ Seqs3 = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1 + Half,Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ io:format("Read second half done~n", []),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_redeliver() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% now requeue every other message (starting at the _first_)
+ %% and ack the other ones
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:requeue(q, [SeqId2]),
+ false;
+ (SeqId2, false) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Redeliver and acking done~n", []),
+ %% we should now get the 2nd half in order, followed by every-other-from-the-first-half
+ Seqs2 = [begin
+ Remaining = round(Total - N + (Half/2)),
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1+Half, Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ Seqs3 = [begin
+ Remaining = round((Half - N) / 2) - 1,
+ {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1, Half, 2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_purge() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q),
+ SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ rabbit_disk_queue:purge(q),
+ io:format("Purge done~n", []),
+ rabbit_disk_queue:tx_commit(q, [], Seqs),
+ io:format("Ack first half done~n", []),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_dump_queue() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All],
+ QList = rabbit_disk_queue:dump_queue(q),
+ rdq_stop(),
+ io:format("dump ok undelivered~n", []),
+ rdq_start(),
+ lists:foreach(
+ fun (N) ->
+ Remaining = Total - N,
+ {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q)
+ end, All),
+ [] = rabbit_disk_queue:dump_queue(q),
+ rdq_stop(),
+ io:format("dump ok post delivery~n", []),
+ rdq_start(),
+ QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All],
+ QList2 = rabbit_disk_queue:dump_queue(q),
+ io:format("dump ok post delivery + restart~n", []),
+ rdq_stop(),
+ passed.
+
+rdq_test_mixed_queue_modes() ->
+ rdq_virgin(),
+ rdq_start(),
+ Payload = <<0:(8*256)>>,
+ {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed),
+ MS2 = lists:foldl(fun (_N, MS1) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS, lists:seq(1,10)),
+ MS4 = lists:foldl(fun (_N, MS3) ->
+ Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload))
+ #basic_message { is_persistent = true },
+ {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
+ MS3a
+ end, MS2, lists:seq(1,10)),
+ MS6 = lists:foldl(fun (_N, MS5) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
+ MS5a
+ end, MS4, lists:seq(1,10)),
+ 30 = rabbit_mixed_queue:length(MS6),
+ io:format("Published a mixture of messages~n"),
+ {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ 30 = rabbit_mixed_queue:length(MS7),
+ io:format("Converted to disk only mode~n"),
+ {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7),
+ 30 = rabbit_mixed_queue:length(MS8),
+ io:format("Converted to mixed mode~n"),
+ MS10 =
+ lists:foldl(
+ fun (N, MS9) ->
+ Rem = 30 - N,
+ {{#basic_message { is_persistent = false },
+ false, _AckTag, Rem},
+ MS9a} = rabbit_mixed_queue:deliver(MS9),
+ MS9a
+ end, MS8, lists:seq(1,10)),
+ 20 = rabbit_mixed_queue:length(MS10),
+ io:format("Delivered initial non persistent messages~n"),
+ {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ 20 = rabbit_mixed_queue:length(MS11),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed),
+ 10 = rabbit_mixed_queue:length(MS12),
+ io:format("Recovered queue~n"),
+ {MS14, AckTags} =
+ lists:foldl(
+ fun (N, {MS13, AcksAcc}) ->
+ Rem = 10 - N,
+ {{#basic_message { is_persistent = true },
+ false, AckTag, Rem},
+ MS13a} = rabbit_mixed_queue:deliver(MS13),
+ {MS13a, [AckTag | AcksAcc]}
+ end, {MS12, []}, lists:seq(1,10)),
+ 0 = rabbit_mixed_queue:length(MS14),
+ {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
+ io:format("Delivered and acked all messages~n"),
+ {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ 0 = rabbit_mixed_queue:length(MS16),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed),
+ 0 = rabbit_mixed_queue:length(MS17),
+ io:format("Recovered queue~n"),
+ rdq_stop(),
+ passed.
+
+rdq_time_commands(Funcs) ->
+ lists:foreach(fun (F) -> F() end, Funcs).
+
+rdq_virgin() ->
+ {Micros, {ok, _}} =
+ timer:tc(rabbit_disk_queue, start_link, []),
+ ok = rabbit_disk_queue:stop_and_obliterate(),
+ timer:sleep(1000),
+ Micros.
+
+rdq_start() ->
+ {ok, _} = rabbit_disk_queue:start_link(),
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ ok.
+
+rdq_stop() ->
+ rabbit_disk_queue:stop(),
+ timer:sleep(1000).