diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-11 23:15:59 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-11 23:15:59 +0100 |
commit | 3a4049025841e83a8ea6166f01eb64bf8558c845 (patch) | |
tree | 718058c22af605b3390835184806df09808629b8 | |
parent | c54549506c2086b09bf9831455d410c86a4baf65 (diff) | |
parent | 7ff061f10ee33832fba9864313a0191b20bea7b0 (diff) | |
download | rabbitmq-server-3a4049025841e83a8ea6166f01eb64bf8558c845.tar.gz |
merging in from default
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 67 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 437 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
-rw-r--r-- | src/rabbit_db_queue.erl | 454 | ||||
-rw-r--r-- | src/rabbit_db_queue_schema.sql | 22 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 1755 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 22 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 394 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 51 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 523 | ||||
-rw-r--r-- | src/rabbit_queue_mode_manager.erl | 105 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 387 |
18 files changed, 3419 insertions, 845 deletions
@@ -69,7 +69,7 @@ clean: cleandb rm -f docs/*.[0-9].gz cleandb: stop-node - erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*")), halt().' + erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*") ++ filelib:wildcard(filename:join(mnesia:system_info(directory), "rabbit_disk_queue/*"))), halt().' ############ various tasks to interact with RabbitMQ ################### diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 784c21b3..a2840931 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,9 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). + +-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id, next_seq_id}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -134,7 +136,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: bool()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: bool(), diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8502d60a..0aa09bd8 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -98,6 +98,7 @@ exec erl \ -os_mon memsup_system_only true \ -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ + -mnesia dump_log_write_threshold 10000 \ ${RABBITMQ_CLUSTER_CONFIG_OPTION} \ ${RABBITMQ_SERVER_START_ARGS} \ "$@" diff --git a/src/rabbit.erl b/src/rabbit.erl index 1ddb5151..44e4dc7f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -138,6 +138,8 @@ start(normal, []) -> {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), + + ok = start_child(rabbit_queue_mode_manager), ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -145,15 +147,20 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) end}, + {"disk queue", + fun () -> + ok = start_child(rabbit_disk_queue), + ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME + end}, {"recovery", fun () -> ok = maybe_insert_default_data(), ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, - {"persister", - fun () -> - ok = start_child(rabbit_persister) + {ok, DurableQueues} = rabbit_amqqueue:recover(), + DurableQueueNames = + sets:from_list(lists:map( + fun(Q) -> Q #amqqueue.name end, DurableQueues)), + ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames) end}, {"guid generator", fun () -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 198e2782..01d40aa1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,6 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([constrain_memory/2]). -import(mnesia). -import(gen_server2). @@ -103,6 +104,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(constrain_memory/2 :: (pid(), bool()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -121,37 +123,39 @@ start() -> ok. recover() -> - ok = recover_durable_queues(), - ok. + {ok, DurableQueues} = recover_durable_queues(), + {ok, DurableQueues}. recover_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) - end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + DurableQueues = + lists:foldl( + fun (RecoveredQ, Acc) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> [Q|Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc + end + end, [], + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end)), + {ok, DurableQueues}. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -305,10 +309,13 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 10, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 10, {unblock, ChPid}). + +constrain_memory(QPid, Constrain) -> + gen_server2:pcast(QPid, 10, {constrain, Constrain}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f..ff0cc56b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -51,8 +51,8 @@ owner, exclusive_consumer, has_had_consumers, + mixed_state, next_msg_id, - message_buffer, active_consumers, blocked_consumers}). @@ -92,23 +92,27 @@ start_link(Q) -> %%---------------------------------------------------------------------------- -init(Q) -> +init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + {ok, Mode} = rabbit_queue_mode_manager:register(self()), + {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode), {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, + mixed_state = MS, next_msg_id = 1, - message_buffer = queue:new(), active_consumers = queue:new(), blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), + NewState = + lists:foldl(fun (Txn, State1) -> + rollback_transaction(Txn, State1) + end, State, all_tx()), + rabbit_mixed_queue:delete_queue(NewState #q.mixed_state), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> @@ -165,12 +169,11 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_immediately(Message, Delivered, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), +deliver_queue(Fun, FunAcc0, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, + next_msg_id = NextId}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -178,13 +181,18 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + IsMsgReady = Fun(is_message_ready, FunAcc0, State), + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> + {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} = + Fun(AckRequired, FunAcc0, State), + ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), + {QName, self(), NextId, IsDelivered, Msg}), NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); + true -> dict:store(NextId, {Msg, AckTag}, UAM); false -> UAM end, NewC = C#cr{unsent_message_count = Count + 1, @@ -202,54 +210,100 @@ deliver_immediately(Message, Delivered, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; - false -> + State3 = State2 #q { active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1 + }, + if Remaining == 0 -> {FunAcc1, State3}; + true -> deliver_queue(Fun, FunAcc1, State3) + end; + %% if IsMsgReady then we've hit the limiter + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_immediately( - Message, Delivered, + deliver_queue( + Fun, FunAcc0, State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + blocked_consumers = NewBlockedConsumers}); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc0, State} end; {empty, _} -> - {not_offered, State} + {FunAcc0, State} end. -attempt_delivery(none, _ChPid, Message, State) -> - case deliver_immediately(Message, false, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), - {true, State1}; - {not_offered, State1} -> - {false, State1} - end; -attempt_delivery(Txn, ChPid, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), - {true, State}. - -deliver_or_enqueue(Txn, ChPid, Message, State) -> - case attempt_delivery(Txn, ChPid, Message, State) of +deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) -> + 0 /= rabbit_mixed_queue:length(MS); +deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) -> + {Res, MS2} = rabbit_mixed_queue:deliver(MS), + MS3 = case {Res, AckRequired} of + {_, true} -> MS2; + {empty, _} -> MS2; + {{_Msg, _IsDelivered, AckTag, _Remaining}, false} -> + {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2), + MS4 + end, + {Res, Acc, State #q { mixed_state = MS3 }}. + +run_message_queue(State) -> + {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State), + State2. + +attempt_immediate_delivery(none, _ChPid, Msg, State) -> + Fun = + fun (is_message_ready, false, _State) -> + true; + (AckRequired, false, State2) -> + {AckTag, State3} = + if AckRequired -> + {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered( + Msg, State2 #q.mixed_state), + {AckTag2, State2 #q { mixed_state = MS }}; + true -> + {noack, State2} + end, + {{Msg, false, AckTag, 0}, true, State3} + end, + deliver_queue(Fun, false, State); +attempt_immediate_delivery(Txn, ChPid, Msg, State) -> + {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), + record_pending_message(Txn, ChPid, Msg), + {true, State #q { mixed_state = MS }}. + +deliver_or_enqueue(Txn, ChPid, Msg, State) -> + case attempt_immediate_delivery(Txn, ChPid, Msg, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none and no unblocked channels with consumers + {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state), + {false, NewState #q { mixed_state = MS }} end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +%% all these messages have already been delivered at least once and +%% not ack'd, but need to be either redelivered or requeued +deliver_or_requeue_n([], State) -> + run_message_queue(State); +deliver_or_requeue_n(MsgsWithAcks, State) -> + {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = + deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), + {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state), + case OutstandingMsgs of + [] -> run_message_queue(NewState #q { mixed_state = MS }); + _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), + NewState #q { mixed_state = MS2 } + end. + +deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) -> + -1 < Len; +deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; +deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -283,7 +337,7 @@ possibly_unblock(State, ChPid, Update) -> move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), - run_poke_burst( + run_message_queue( State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedeConsumers}) end @@ -300,27 +354,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> unacked_messages = UAM} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), - case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State)), - erase_tx(Txn) - end, - NewState = - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ + State1 = + case Txn of + none -> State; + _ -> rollback_transaction(Txn, State) + end, + State2 = + deliver_or_requeue_n( + [MsgWithAck || + {_MsgId, MsgWithAck} <- dict:to_list(UAM)], + State1 # q { exclusive_consumer = case Holder of {ChPid, _} -> none; Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State1#q.active_consumers), blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}), - case should_auto_delete(NewState) of - false -> noreply(NewState); - true -> {stop, normal, NewState} + ChPid, State1#q.blocked_consumers)}), + case should_auto_delete(State2) of + false -> noreply(State2); + true -> {stop, normal, State2} end end. @@ -343,26 +397,6 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(MessageBuffer, State). - -run_poke_burst(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, Delivered), - run_poke_burst(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_poke_burst(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} - end; - {empty, _} -> - State#q{message_buffer = MessageBuffer} - end. - is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso queue:is_empty(State#q.blocked_consumers). @@ -371,62 +405,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). - -persist_delivery(_QName, _Message, - true) -> - ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, - _Delivered) -> - ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, - _Delivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). - -persist_acks(Txn, QName, Messages) -> - persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). - -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> - ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, @@ -448,19 +426,12 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), +record_pending_message(Txn, ChPid, Message = #basic_message { is_persistent = IsPersistent }) -> + Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], + is_persistent = IsPersistentTxn orelse IsPersistent + }). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), @@ -468,38 +439,43 @@ record_pending_acks(Txn, ChPid, MsgIds) -> store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). +commit_transaction(Txn, State) -> + #tx { ch_pid = ChPid, + pending_messages = PendingMessages, + pending_acks = PendingAcks + } = lookup_tx(Txn), + PendingMessagesOrdered = lists:reverse(PendingMessages), + PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)), + {ok, MS} = + case lookup_ch(ChPid) of + not_found -> + rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, [], State #q.mixed_state); + C = #cr { unacked_messages = UAM } -> + {MsgWithAcks, Remaining} = + collect_messages(PendingAcksOrdered, UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), + rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, + lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + State #q.mixed_state) + end, + State #q { mixed_state = MS }. + +rollback_transaction(Txn, State) -> + #tx { pending_messages = PendingMessages + } = lookup_tx(Txn), + {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state), + erase_tx(Txn), + State #q { mixed_state = MS }. +%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C +%% err, A = C `intersect` D , via projection through the dict that is C collect_messages(MsgIds, UAM) -> lists:mapfoldl( fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, UAM, MsgIds). -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _Delivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MessageId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -508,8 +484,8 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q { mixed_state = MS }) -> + rabbit_mixed_queue:length(MS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -558,7 +534,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = attempt_immediate_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> @@ -567,12 +543,11 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> - ok = commit_work(Txn, qname(State)), + NewState = commit_transaction(Txn, State), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), erase_tx(Txn), - noreply(NewState); + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -582,25 +557,26 @@ handle_call({notify_down, ChPid}, From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> + mixed_state = MS + }) -> + case rabbit_mixed_queue:deliver(MS) of + {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 }); + {{Msg, IsDelivered, AckTag, Remaining}, MS2} -> AckRequired = not(NoAck), - case AckRequired of - true -> - persist_delivery(QName, Message, Delivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) - end, - Msg = {QName, self(), NextId, Delivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + {ok, MS3} = + if AckRequired -> + C = #cr{unacked_messages = UAM} = ch_record(ChPid), + NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + {ok, MS2}; + true -> + rabbit_mixed_queue:ack([AckTag], MS2) + end, + Message = {QName, self(), NextId, IsDelivered, Msg}, + reply({ok, Remaining, Message}, + State #q { next_msg_id = NextId + 1, + mixed_state = MS3 + }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -640,7 +616,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, add_consumer( ChPid, Consumer, State1#q.blocked_consumers)}; - false -> run_poke_burst( + false -> run_message_queue( State1#q{ active_consumers = add_consumer( @@ -682,14 +658,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + mixed_state = MS, active_consumers = ActiveConsumers}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, - State); + Length = rabbit_mixed_queue:length(MS), + reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> - IsEmpty = queue:is_empty(MessageBuffer), + State = #q { mixed_state = MS }) -> + Length = rabbit_mixed_queue:length(MS), + IsEmpty = Length == 0, IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> @@ -697,13 +674,13 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, Length}, State} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State) -> + {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), + reply({ok, Count}, + State #q { mixed_state = MS }); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -737,24 +714,21 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), + {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> - store_ch_record(C#cr{unacked_messages = Remaining}); + Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state), + store_ch_record(C#cr{unacked_messages = Remaining}), + noreply(State #q { mixed_state = MS }); _ -> - record_pending_acks(Txn, ChPid, MsgIds) - end, - noreply(State) + record_pending_acks(Txn, ChPid, MsgIds), + noreply(State) + end end; handle_cast({rollback, Txn}, State) -> - ok = rollback_work(Txn, qname(State)), - erase_tx(Txn), - noreply(State); - -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); + noreply(rollback_transaction(Txn, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of @@ -763,10 +737,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [ChPid]), noreply(State); C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), + {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + noreply(deliver_or_requeue_n(MsgWithAcks, State)) end; handle_cast({unblock, ChPid}, State) -> @@ -795,7 +768,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) -> + {ok, MS2} = if Constrain -> rabbit_mixed_queue:to_disk_only_mode(MS); + true -> rabbit_mixed_queue:to_mixed_mode(MS) + end, + noreply(State #q { mixed_state = MS2 }). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 761b3863..0673bdd8 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -72,4 +72,5 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, content = Content, - persistent_key = none}. + guid = rabbit_guid:guid(), + is_persistent = false}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3089bb62..ed715097 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -315,14 +315,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent)}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl new file mode 100644 index 00000000..897a4a6f --- /dev/null +++ b/src/rabbit_db_queue.erl @@ -0,0 +1,454 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% So, assuming you're on some debian linux type system, +%% apt-get install postgresql odbc-postgresql unixodbc unixodbc-bin +%% sudo odbcinst -i -d -f /usr/share/psqlodbc/odbcinst.ini.template + +%% Now set up in postgresql a user and a database that user can +%% access. For example, the database could be called rabbit_db_queue +%% and the username could be rabbit and the password could be rabbit. + +%% sudo ODBCConfig +%% set up a system wide dsn with the above settings in it. +%% now drop into the erlang shell, and you should not get an error after: + +%% > odbc:start(). +%% < ok. +%% > odbc:connect("DSN=rabbit_db_queue", []). +%% < {ok,<0.325.0>} +%% ( replace rabbit_db_queue with the name of your DSN that you configured ) + +%% the connection string (eg "DSN=rabbit_db_queue") is what you pass +%% to start_link. Don't just pass the DSN name. + +-module(rabbit_db_queue). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, + tx_commit/3, tx_cancel/1, requeue/2, purge/1]). + +-export([stop/0, stop_and_obliterate/0]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +%% ---- SPECS ---- + +-ifdef(use_specs). + +-type(seq_id() :: non_neg_integer()). + +-spec(start_link/1 :: (non_neg_integer()) -> + {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). +-spec(deliver/1 :: (queue_name()) -> + {'empty' | {msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}}). +-spec(phantom_deliver/1 :: (queue_name()) -> + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). +-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). +-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). +-spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(stop/0 :: () -> 'ok'). +-spec(stop_and_obliterate/0 :: () -> 'ok'). + +-endif. + +%% ---- PUBLIC API ---- + +start_link(DSN) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, + [DSN], []). + +publish(Q, MsgId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). + +deliver(Q) -> + gen_server:call(?SERVER, {deliver, Q}, infinity). + +phantom_deliver(Q) -> + gen_server:call(?SERVER, {phantom_deliver, Q}). + +ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). + +tx_publish(MsgId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). + +tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> + gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). + +tx_cancel(MsgIds) when is_list(MsgIds) -> + gen_server:cast(?SERVER, {tx_cancel, MsgIds}). + +requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). + +purge(Q) -> + gen_server:call(?SERVER, {purge, Q}). + +stop() -> + gen_server:call(?SERVER, stop, infinity). + +stop_and_obliterate() -> + gen_server:call(?SERVER, stop_vaporise, infinity). + +%% ---- GEN-SERVER INTERNAL API ---- +-record(dbstate, { db_conn }). + +init([DSN]) -> + process_flag(trap_exit, true), + odbc:start(), + {ok, Conn} = odbc:connect(DSN, [{auto_commit, off}, {tuple_row, on}, + {scrollable_cursors, off}, {trace_driver, off}]), + State = #dbstate { db_conn = Conn }, + compact_already_delivered(State), + {ok, State}. + +handle_call({deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, true, State), + {reply, Result, State1}; +handle_call({phantom_deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, false, State), + {reply, Result, State1}; +handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), + {reply, ok, State1}; +handle_call({purge, Q}, _From, State) -> + {ok, Count, State1} = internal_purge(Q, State), + {reply, Count, State1}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; %% gen_server now calls terminate +handle_call(stop_vaporise, _From, State = #dbstate { db_conn = Conn }) -> + odbc:sql_query(Conn, "delete from ledger"), + odbc:sql_query(Conn, "delete from sequence"), + odbc:sql_query(Conn, "delete from message"), + odbc:commit(Conn, commit), + {stop, normal, ok, State}. + %% gen_server now calls terminate, which then calls shutdown + +handle_cast({publish, Q, MsgId, MsgBody}, State) -> + {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), + {noreply, State1}; +handle_cast({ack, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_ack(Q, MsgSeqIds, State), + {noreply, State1}; +handle_cast({tx_publish, MsgId, MsgBody}, State) -> + {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), + {noreply, State1}; +handle_cast({tx_cancel, MsgIds}, State) -> + {ok, State1} = internal_tx_cancel(MsgIds, State), + {noreply, State1}; +handle_cast({requeue, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqIds, State), + {noreply, State1}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + shutdown(State). + +shutdown(State = #dbstate { db_conn = Conn }) -> + odbc:disconnect(Conn), + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ---- UTILITY FUNCTIONS ---- + +binary_to_escaped_string(Bin) when is_binary(Bin) -> + "E'" ++ lists:flatten(lists:reverse(binary_to_escaped_string(Bin, []))) ++ "'". + +binary_to_escaped_string(<<>>, Acc) -> + Acc; +binary_to_escaped_string(<<Byte:8, Rest/binary>>, Acc) -> + binary_to_escaped_string(Rest, [escape_byte(Byte) | Acc]). + +escape_byte(39) -> + "\\\\047"; +escape_byte(92) -> + "\\\\134"; +escape_byte(B) when B > 31 andalso B < 127 -> + B; +escape_byte(B) -> + case io_lib:format("~.8B", [B]) of + O1 = [[_]] -> + "\\\\00" ++ O1; + O2 = [[_,_]] -> + "\\\\0" ++ O2; + O3 = [[_,_,_]] -> + "\\\\" ++ O3 + end. + +escaped_string_to_binary(Str) when is_list(Str) -> + list_to_binary(lists:reverse(escaped_string_to_binary(Str, []))). + +escaped_string_to_binary([], Acc) -> + Acc; +escaped_string_to_binary([$\\,$\\|Rest], Acc) -> + escaped_string_to_binary(Rest, [$\\ | Acc]); +escaped_string_to_binary([$\\,A,B,C|Rest], Acc) -> + escaped_string_to_binary(Rest, [(list_to_integer([A])*64) + + (list_to_integer([B])*8) + + list_to_integer([C]) + | Acc]); +escaped_string_to_binary([C|Rest], Acc) -> + escaped_string_to_binary(Rest, [C|Acc]). + +hex_string_to_binary(Str) when is_list(Str) -> + list_to_binary(lists:reverse(hex_string_to_binary(Str, []))). + +hex_string_to_binary([], Acc) -> + Acc; +hex_string_to_binary([A,B|Rest], Acc) -> + {ok, [N], []} = io_lib:fread("~16u", [A,B]), + hex_string_to_binary(Rest, [N | Acc]). + +%% ---- INTERNAL RAW FUNCTIONS ---- + +internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of + {selected, _, []} -> + odbc:commit(Conn, commit), + {ok, empty, State}; + {selected, _, [{ReadSeqId}]} -> + case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++ + " and seq_id = " ++ integer_to_list(ReadSeqId)) of + {selected, _, []} -> + {ok, empty, State}; + {selected, _, [{IsDeliveredStr, MsgIdStr}]} -> + IsDelivered = IsDeliveredStr /= "0", + if IsDelivered -> ok; + true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId)) + end, + MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)), + %% yeah, this is really necessary. sigh + MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)), + odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++ + " where queue = " ++ QStr), + if ReadMsg -> + {selected, _, [{MsgBodyStr}]} = + odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2), + odbc:commit(Conn, commit), + MsgBody = hex_string_to_binary(MsgBodyStr), + BodySize = size(MsgBody), + {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}; + true -> + odbc:commit(Conn, commit), + {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State} + end + end + end. + +internal_ack(Q, MsgSeqIds, State) -> + remove_messages(Q, MsgSeqIds, true, State). + +%% Q is only needed if LedgerDelete /= false +%% called from tx_cancel with LedgerDelete = false +%% called from internal_tx_cancel with LedgerDelete = true +%% called from ack with LedgerDelete = true +remove_messages(Q, MsgSeqIds, LedgerDelete, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + lists:foreach( + fun ({MsgId, SeqId}) -> + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + {selected, _, [{RefCount}]} = + odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ + MsgIdStr), + case RefCount of + 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++ + MsgIdStr); + _ -> odbc:sql_query(Conn, "update message set ref_count = " ++ + integer_to_list(RefCount - 1) ++ " where msg_id = " ++ + MsgIdStr) + end, + if LedgerDelete -> + odbc:sql_query(Conn, "delete from ledger where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(SeqId)); + true -> ok + end + end, MsgSeqIds), + odbc:commit(Conn, commit), + {ok, State}. + +internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) -> + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + MsgStr = binary_to_escaped_string(MsgBody), + case odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ MsgIdStr) of + {selected, _, []} -> + odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++ + MsgIdStr ++ ", " ++ MsgStr ++ ", 1)"); + {selected, _, [{RefCount}]} -> + odbc:sql_query(Conn, "update message set ref_count = " ++ + integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr) + end, + odbc:commit(Conn, commit), + {ok, State}. + +internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + {InsertOrUpdate, NextWrite} = + case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of + {selected, _, []} -> {insert, 0}; + {selected, _, [{NextWrite2}]} -> {update, NextWrite2} + end, + NextWrite3 = + lists:foldl(fun (MsgId, WriteSeqInteger) -> + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + odbc:sql_query(Conn, + "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++ + QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++ + MsgIdStr ++ ")"), + WriteSeqInteger + 1 + end, NextWrite, PubMsgIds), + case InsertOrUpdate of + update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++ + " where queue = " ++ QStr); + insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++ + QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")") + end, + odbc:commit(Conn, commit), + remove_messages(Q, AckSeqIds, true, State), + {ok, State}. + +internal_publish(Q, MsgId, MsgBody, State = #dbstate { db_conn = Conn }) -> + {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + QStr = binary_to_escaped_string(term_to_binary(Q)), + NextWrite = + case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of + {selected, _, []} -> + odbc:sql_query(Conn, + "insert into sequence (queue, next_read, next_write) values (" ++ + QStr ++ ", 0, 1)"), + 0; + {selected, _, [{NextWrite2}]} -> + odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++ + " where queue = " ++ QStr), + NextWrite2 + end, + odbc:sql_query(Conn, "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++ + QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"), + odbc:commit(Conn, commit), + {ok, State1}. + +internal_tx_cancel(MsgIds, State) -> + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), + remove_messages(undefined, MsgSeqIds, false, State). + +internal_requeue(Q, MsgSeqIds, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + {selected, _, [{WriteSeqId}]} = + odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr), + WriteSeqId2 = + lists:foldl( + fun ({_MsgId, SeqId}, NextWriteSeqId) -> + odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++ + " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr), + NextWriteSeqId + 1 + end, WriteSeqId, MsgSeqIds), + odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(WriteSeqId2) ++ + " where queue = " ++ QStr), + odbc:commit(Conn, commit), + {ok, State}. + + +compact_already_delivered(#dbstate { db_conn = Conn }) -> + {selected, _, Seqs} = odbc:sql_query(Conn, "select queue, next_read from sequence"), + lists:foreach( + fun ({QHexStr, ReadSeqId}) -> + Q = binary_to_term(hex_string_to_binary(QHexStr)), + QStr = binary_to_escaped_string(term_to_binary(Q)), + case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = " + ++ QStr) of + {selected, _, []} -> ok; + {selected, _, [{null}]} -> ok; %% AGH! + {selected, _, [{Min}]} -> + Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0), + odbc:sql_query(Conn, "update sequence set next_read = " ++ + integer_to_list(Min + Gap) ++ + " where queue = " ++ QStr) + end + end, Seqs), + odbc:commit(Conn, commit). + +shuffle_up(_Conn, _QStr, SeqId, SeqId, Gap) -> + Gap; +shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) -> + GapInc = + case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of + {selected, _, [{"0"}]} -> + 1; + {selected, _, [{"1"}]} -> + if Gap =:= 0 -> ok; + true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++ + integer_to_list(SeqId + Gap) ++ " where seq_id = " ++ + integer_to_list(SeqId) ++ " and queue = " ++ QStr) + end, + 0 + end, + shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc). + +internal_purge(Q, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of + {selected, _, []} -> + odbc:commit(Conn, commit), + {ok, 0, State}; + {selected, _, [{ReadSeqId}]} -> + odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr), + {selected, _, MsgSeqIds} = + odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++ + QStr ++ " and seq_id >= " ++ ReadSeqId), + MsgSeqIds2 = lists:map( + fun ({MsgIdStr, SeqIdStr}) -> + { binary_to_term(hex_string_to_binary(MsgIdStr)), + list_to_integer(SeqIdStr) } + end, MsgSeqIds), + {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State), + {ok, length(MsgSeqIds2), State2} + end. diff --git a/src/rabbit_db_queue_schema.sql b/src/rabbit_db_queue_schema.sql new file mode 100644 index 00000000..f5c49e8d --- /dev/null +++ b/src/rabbit_db_queue_schema.sql @@ -0,0 +1,22 @@ +create table message ( + msg_id bytea PRIMARY KEY, + msg bytea, + ref_count integer NOT NULL +); +create index message_msg_id_index on message (msg_id); + +create table sequence ( + queue bytea PRIMARY KEY, + next_read integer NOT NULL, + next_write integer NOT NULL +); +create index sequence_queue_index on sequence (queue); + +create table ledger ( + queue bytea NOT NULL, + seq_id integer NOT NULL, + is_delivered boolean NOT NULL, + msg_id bytea NOT NULL +); +create index ledger_queue_seq_id_index on ledger (queue, seq_id); + diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl new file mode 100644 index 00000000..2b6f7b00 --- /dev/null +++ b/src/rabbit_disk_queue.erl @@ -0,0 +1,1755 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_disk_queue). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, + requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, + dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 + ]). + +-export([length/1, is_empty/1, next_write_seq/1]). + +-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). + +-include("rabbit.hrl"). + +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK, 255). +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). + +-define(SERVER, ?MODULE). + +-define(MAX_READ_FILE_HANDLES, 256). +-define(FILE_SIZE_LIMIT, (256*1024*1024)). + +-record(dqstate, {msg_location_dets, %% where are messages? + msg_location_ets, %% as above, but for ets version + operation_mode, %% ram_disk | disk_only + file_summary, %% what's in the files? + sequences, %% next read and write for each q + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file + current_dirty, %% has the current file been written to since the last fsync? + file_size_limit, %% how big can our files get? + read_file_handles, %% file handles for reading (LRU) + read_file_handles_limit %% how many file handles can we open? + }). + +%% The components: +%% +%% MsgLocation: this is a dets table which contains: +%% {MsgId, RefCount, File, Offset, TotalSize} +%% FileSummary: this is an ets table which contains: +%% {File, ValidTotalSize, ContiguousTop, Left, Right} +%% Sequences: this is an ets table which contains: +%% {Q, ReadSeqId, WriteSeqId, QueueLength} +%% rabbit_disk_queue: this is an mnesia table which contains: +%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, +%% is_delivered = IsDelivered, +%% msg_id = MsgId, +%% next_seq_id = SeqId +%% } +%% + +%% The basic idea is that messages are appended to the current file up +%% until that file becomes too big (> file_size_limit). At that point, +%% the file is closed and a new file is created on the _right_ of the +%% old file which is used for new messages. Files are named +%% numerically ascending, thus the file with the lowest name is the +%% eldest file. +%% +%% We need to keep track of which messages are in which files (this is +%% the MsgLocation table); how much useful data is in each file and +%% which files are on the left and right of each other. This is the +%% purpose of the FileSummary table. +%% +%% As messages are removed from files, holes appear in these +%% files. The field ValidTotalSize contains the total amount of useful +%% data left in the file, whilst ContiguousTop contains the amount of +%% valid data right at the start of each file. These are needed for +%% garbage collection. +%% +%% On publish, we write the message to disk, record the changes to +%% FileSummary and MsgLocation, and, should this be either a plain +%% publish, or followed by a tx_commit, we record the message in the +%% mnesia table. Sequences exists to enforce ordering of messages as +%% they are published within a queue. +%% +%% On delivery, we read the next message to be read from disk +%% (according to the ReadSeqId for the given queue) and record in the +%% mnesia table that the message has been delivered. +%% +%% On ack we remove the relevant entry from MsgLocation, update +%% FileSummary and delete from the mnesia table. +%% +%% In order to avoid extra mnesia searching, we return the SeqId +%% during delivery which must be returned in ack - it is not possible +%% to ack from MsgId alone. + +%% As messages are ack'd, holes develop in the files. When we discover +%% that either a file is now empty or that it can be combined with the +%% useful data in either its left or right file, we compact the two +%% files together. This keeps disk utilisation high and aids +%% performance. +%% +%% Given the compaction between two files, the left file is considered +%% the ultimate destination for the good data in the right file. If +%% necessary, the good data in the left file which is fragmented +%% throughout the file is written out to a temporary file, then read +%% back in to form a contiguous chunk of good data at the start of the +%% left file. Thus the left file is garbage collected and +%% compacted. Then the good data from the right file is copied onto +%% the end of the left file. MsgLocation and FileSummary tables are +%% updated. +%% +%% On startup, we scan the files we discover, dealing with the +%% possibilites of a crash have occured during a compaction (this +%% consists of tidyup - the compaction is deliberately designed such +%% that data is duplicated on disk rather than risking it being lost), +%% and rebuild the dets and ets tables (MsgLocation, FileSummary, +%% Sequences) from what we find. We ensure that the messages we have +%% discovered on disk match exactly with the messages recorded in the +%% mnesia table. + +%% MsgLocation is deliberately a dets table, and the mnesia table is +%% set to be a disk_only_table in order to ensure that we are not RAM +%% constrained. However, for performance reasons, it is possible to +%% call to_ram_disk_mode/0 which will alter the mnesia table to +%% disc_copies and convert MsgLocation to an ets table. This results +%% in a massive performance improvement, at the expense of greater RAM +%% usage. The idea is that when memory gets tight, we switch to +%% disk_only mode but otherwise try to run in ram_disk mode. + +%% So, with this design, messages move to the left. Eventually, they +%% should end up in a contiguous block on the left and are then never +%% rewritten. But this isn't quite the case. If in a file there is one +%% message that is being ignored, for some reason, and messages in the +%% file to the right and in the current block are being read all the +%% time then it will repeatedly be the case that the good data from +%% both files can be combined and will be written out to a new +%% file. Whenever this happens, our shunned message will be rewritten. +%% +%% So, provided that we combine messages in the right order, +%% (i.e. left file, bottom to top, right file, bottom to top), +%% eventually our shunned message will end up at the bottom of the +%% left file. The compaction/combining algorithm is smart enough to +%% read in good data from the left file that is scattered throughout +%% (i.e. C and D in the below diagram), then truncate the file to just +%% above B (i.e. truncate to the limit of the good contiguous region +%% at the start of the file), then write C and D on top and then write +%% E, F and G from the right file on top. Thus contiguous blocks of +%% good data at the bottom of files are not rewritten (yes, this is +%% the data the size of which is tracked by the ContiguousTop +%% variable. Judicious use of a mirror is required). +%% +%% +-------+ +-------+ +-------+ +%% | X | | G | | G | +%% +-------+ +-------+ +-------+ +%% | D | | X | | F | +%% +-------+ +-------+ +-------+ +%% | X | | X | | E | +%% +-------+ +-------+ +-------+ +%% | C | | F | ===> | D | +%% +-------+ +-------+ +-------+ +%% | X | | X | | C | +%% +-------+ +-------+ +-------+ +%% | B | | X | | B | +%% +-------+ +-------+ +-------+ +%% | A | | E | | A | +%% +-------+ +-------+ +-------+ +%% left right left +%% +%% From this reasoning, we do have a bound on the number of times the +%% message is rewritten. From when it is inserted, there can be no +%% files inserted between it and the head of the queue, and the worst +%% case is that everytime it is rewritten, it moves one position lower +%% in the file (for it to stay at the same position requires that +%% there are no holes beneath it, which means truncate would be used +%% and so it would not be rewritten at all). Thus this seems to +%% suggest the limit is the number of messages ahead of it in the +%% queue, though it's likely that that's pessimistic, given the +%% requirements for compaction/combination of files. +%% +%% The other property is that we have is the bound on the lowest +%% utilisation, which should be 50% - worst case is that all files are +%% fractionally over half full and can't be combined (equivalent is +%% alternating full files and files with only one tiny message in +%% them). + +%% ---- SPECS ---- + +-ifdef(use_specs). + +-type(seq_id() :: non_neg_integer()). +-type(seq_id_or_next() :: { seq_id() | 'next' }). + +-spec(start_link/0 :: () -> + {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok'). +-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok'). +-spec(deliver/1 :: (queue_name()) -> + {'empty' | {msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}, non_neg_integer()}}). +-spec(phantom_deliver/1 :: (queue_name()) -> + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). +-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}], + [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). +-spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}, seq_id()}]). +-spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(stop_and_obliterate/0 :: () -> 'ok'). +-spec(to_ram_disk_mode/0 :: () -> 'ok'). +-spec(to_disk_only_mode/0 :: () -> 'ok'). +-spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(next_write_seq/1 :: (queue_name()) -> non_neg_integer()). +-spec(is_empty/1 :: (queue_name()) -> bool()). + +-endif. + +%% ---- PUBLIC API ---- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). + +publish(Q, MsgId, Msg, false) when is_binary(Msg) -> + gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}); +publish(Q, MsgId, Msg, true) when is_binary(Msg) -> + gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity). + +publish_with_seq(Q, MsgId, SeqId, Msg, false) when is_binary(Msg) -> + gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}); +publish_with_seq(Q, MsgId, SeqId, Msg, true) when is_binary(Msg) -> + gen_server2:call(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}, + infinity). + +deliver(Q) -> + gen_server2:call(?SERVER, {deliver, Q}, infinity). + +phantom_deliver(Q) -> + gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity). + +ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). + +auto_ack_next_message(Q) -> + gen_server2:cast(?SERVER, {auto_ack_next_message, Q}). + +tx_publish(MsgId, Msg) when is_binary(Msg) -> + gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}). + +tx_commit(Q, PubMsgIds, AckSeqIds) + when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> + gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). + +tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds) + when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) -> + gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, + infinity). + +tx_cancel(MsgIds) when is_list(MsgIds) -> + gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). + +requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). + +requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) -> + gen_server2:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}). + +purge(Q) -> + gen_server2:call(?SERVER, {purge, Q}, infinity). + +delete_queue(Q) -> + gen_server2:cast(?SERVER, {delete_queue, Q}). + +dump_queue(Q) -> + gen_server2:call(?SERVER, {dump_queue, Q}, infinity). + +delete_non_durable_queues(DurableQueues) -> + gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). + +stop() -> + gen_server2:call(?SERVER, stop, infinity). + +stop_and_obliterate() -> + gen_server2:call(?SERVER, stop_vaporise, infinity). + +to_disk_only_mode() -> + gen_server2:pcall(?SERVER, 10, to_disk_only_mode, infinity). + +to_ram_disk_mode() -> + gen_server2:pcall(?SERVER, 10, to_ram_disk_mode, infinity). + +length(Q) -> + gen_server2:call(?SERVER, {length, Q}, infinity). + +next_write_seq(Q) -> + gen_server2:call(?SERVER, {next_write_seq, Q}, infinity). + +is_empty(Q) -> + 0 == rabbit_disk_queue:length(Q). + +%% ---- GEN-SERVER INTERNAL API ---- + +init([FileSizeLimit, ReadFileHandlesLimit]) -> + %% If the gen_server is part of a supervision tree and is ordered + %% by its supervisor to terminate, terminate will be called with + %% Reason=shutdown if the following conditions apply: + %% * the gen_server has been set to trap exit signals, and + %% * the shutdown strategy as defined in the supervisor's + %% child specification is an integer timeout value, not + %% brutal_kill. + %% Otherwise, the gen_server will be immediately terminated. + process_flag(trap_exit, true), + Node = node(), + ok = + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, + disc_only_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, + disc_only_copies}} -> ok; + E -> E + end, + ok = filelib:ensure_dir(form_filename("nothing")), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)), + {ok, MsgLocationDets} = + dets:open_file(?MSG_LOC_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 1024*1024*1024}, + {type, set} + ]), + + %% it would be better to have this as private, but dets:from_ets/2 + %% seems to blow up if it is set private + MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + + InitName = "0" ++ ?FILE_EXTENSION, + State = + #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + operation_mode = disk_only, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, + [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, + [set, private]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + current_dirty = false, + file_size_limit = FileSizeLimit, + read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_handles_limit = ReadFileHandlesLimit + }, + {ok, State1 = #dqstate { current_file_name = CurrentName, + current_offset = Offset } } = + load_from_disk(State), + Path = form_filename(CurrentName), + Exists = case file:read_file_info(Path) of + {error,enoent} -> false; + {ok, _} -> true + end, + %% read is only needed so that we can seek + {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), + if Exists -> {ok, Offset} = file:position(FileHdl, {bof, Offset}); + true -> %% new file, so preallocate + ok = preallocate(FileHdl, FileSizeLimit, Offset) + end, + {ok, State1 #dqstate { current_file_handle = FileHdl }}. + +handle_call({publish, Q, MsgId, MsgBody}, _From, State) -> + {ok, MsgSeqId, State1} = + internal_publish(Q, MsgId, next, MsgBody, true, State), + {reply, MsgSeqId, State1}; +handle_call({publish_with_seq, Q, MsgId, SeqId, MsgBody}, _From, State) -> + {ok, MsgSeqId, State1} = + internal_publish(Q, MsgId, SeqId, MsgBody, true, State), + {reply, MsgSeqId, State1}; +handle_call({deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, true, false, State), + {reply, Result, State1}; +handle_call({phantom_deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, false, false, State), + {reply, Result, State1}; +handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> + PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}), + {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), + {reply, ok, State1}; +handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, PubSeqMsgIds, AckSeqIds, State), + {reply, ok, State1}; +handle_call({purge, Q}, _From, State) -> + {ok, Count, State1} = internal_purge(Q, State), + {reply, Count, State1}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; %% gen_server now calls terminate +handle_call(stop_vaporise, _From, State) -> + State1 = #dqstate { file_summary = FileSummary, + sequences = Sequences } = + shutdown(State), %% tidy up file handles early + {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), + true = ets:delete(FileSummary), + true = ets:delete(Sequences), + lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), + {stop, normal, ok, + State1 #dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}; + %% gen_server now calls terminate, which then calls shutdown +handle_call(to_disk_only_mode, _From, + State = #dqstate { operation_mode = disk_only }) -> + {reply, ok, State}; +handle_call(to_disk_only_mode, _From, + State = #dqstate { operation_mode = ram_disk, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + rabbit_log:info("Converting disk queue to disk only mode~n", []), + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_only_copies), + ok = dets:from_ets(MsgLocationDets, MsgLocationEts), + true = ets:delete_all_objects(MsgLocationEts), + {reply, ok, State #dqstate { operation_mode = disk_only }}; +handle_call(to_ram_disk_mode, _From, + State = #dqstate { operation_mode = ram_disk }) -> + {reply, ok, State}; +handle_call(to_ram_disk_mode, _From, + State = #dqstate { operation_mode = disk_only, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + rabbit_log:info("Converting disk queue to ram disk mode~n", []), + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_copies), + true = ets:from_dets(MsgLocationEts, MsgLocationDets), + ok = dets:delete_all_objects(MsgLocationDets), + {reply, ok, State #dqstate { operation_mode = ram_disk }}; +handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> + {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), + {reply, Length, State}; +handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) -> + {_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q), + {reply, WriteSeqId, State}; +handle_call({dump_queue, Q}, _From, State) -> + {Result, State1} = internal_dump_queue(Q, State), + {reply, Result, State1}; +handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> + {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), + {reply, ok, State1}. + +handle_cast({publish, Q, MsgId, MsgBody}, State) -> + {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State), + {noreply, State1}; +handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) -> + {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State), + {noreply, State1}; +handle_cast({ack, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_ack(Q, MsgSeqIds, State), + {noreply, State1}; +handle_cast({auto_ack_next_message, Q}, State) -> + {ok, State1} = internal_auto_ack(Q, State), + {noreply, State1}; +handle_cast({tx_publish, MsgId, MsgBody}, State) -> + {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), + {noreply, State1}; +handle_cast({tx_cancel, MsgIds}, State) -> + {ok, State1} = internal_tx_cancel(MsgIds, State), + {noreply, State1}; +handle_cast({requeue, Q, MsgSeqIds}, State) -> + MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}), + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), + {noreply, State1}; +handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), + {noreply, State1}; +handle_cast({delete_queue, Q}, State) -> + {ok, State1} = internal_delete_queue(Q, State), + {noreply, State1}. + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + shutdown(State). + +shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + current_file_handle = FileHdl, + read_file_handles = {ReadHdls, _ReadHdlsAge} + }) -> + %% deliberately ignoring return codes here + dets:close(MsgLocationDets), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)), + true = ets:delete_all_objects(MsgLocationEts), + if FileHdl =:= undefined -> ok; + true -> file:sync(FileHdl), + file:close(FileHdl) + end, + dict:fold(fun (_File, Hdl, _Acc) -> + file:close(Hdl) + end, ok, ReadHdls), + State #dqstate { current_file_handle = undefined, + current_dirty = false, + read_file_handles = {dict:new(), gb_trees:empty()}}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ---- UTILITY FUNCTIONS ---- + +form_filename(Name) -> + filename:join(base_directory(), Name). + +base_directory() -> + filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). + +zip_with_tail(List1, List2) when length(List1) =:= length(List2) -> + lists:zip(List1, List2); +zip_with_tail(List = [_|Tail], {last, E}) -> + zip_with_tail(List, Tail ++ [E]); +zip_with_tail(List, {duplicate, E}) -> + zip_with_tail(List, lists:duplicate(erlang:length(List), E)). + +dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, + Key) -> + dets:lookup(MsgLocationDets, Key); +dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, + Key) -> + ets:lookup(MsgLocationEts, Key). + +dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, + Key) -> + ok = dets:delete(MsgLocationDets, Key); +dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, + Key) -> + true = ets:delete(MsgLocationEts, Key), + ok. + +dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, + Obj) -> + ok = dets:insert(MsgLocationDets, Obj); +dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, + Obj) -> + true = ets:insert(MsgLocationEts, Obj), + ok. + +dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, + Obj) -> + true = dets:insert_new(MsgLocationDets, Obj); +dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, + Obj) -> + true = ets:insert_new(MsgLocationEts, Obj). + +dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, + Obj) -> + dets:match_object(MsgLocationDets, Obj); +dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, + Obj) -> + ets:match_object(MsgLocationEts, Obj). + +find_next_seq_id(CurrentSeq, next) -> + CurrentSeq + 1; +find_next_seq_id(CurrentSeq, NextSeqId) + when NextSeqId > CurrentSeq -> + NextSeqId. + +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) -> + CurrentReadWrite; +determine_next_read_id(CurrentRead, _CurrentWrite, next) -> + CurrentRead; +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite) + when NextWrite > CurrentReadWrite -> + NextWrite; +determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) + when NextWrite >= CurrentWrite -> + CurrentRead. + +get_read_handle(File, State = + #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, + read_file_handles_limit = ReadFileHandlesLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_dirty = IsDirty + }) -> + IsDirty2 = if CurName == File andalso IsDirty -> + file:sync(CurHdl), + false; + true -> IsDirty + end, + Now = now(), + {FileHdl, ReadHdls1, ReadHdlsAge1} = + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, ReadHdls, ReadHdlsAge}; + _False -> + {Then, OldFile, ReadHdlsAge2} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + end; + {ok, {Hdl, Then}} -> + {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + end, + ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), + {FileHdl, State #dqstate { read_file_handles = {ReadHdls3, ReadHdlsAge3}, + current_dirty = IsDirty2 + }}. + +adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> + ExpectedSeqId; +adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) -> + SuppliedSeqId; +adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) -> + ExpectedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) + when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), + ok = mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + SuppliedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) + when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock), + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }, + Lock), + SuppliedSeqId. + +sequence_lookup(Sequences, Q) -> + case ets:lookup(Sequences, Q) of + [] -> + {0, 0, 0}; + [{Q, ReadSeqId, WriteSeqId, Length}] -> + {ReadSeqId, WriteSeqId, Length} + end. + +%% ---- INTERNAL RAW FUNCTIONS ---- + +internal_deliver(Q, ReadMsg, FakeDeliver, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {ok, empty, State}; + [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; + [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> + Remaining = Length - 1, + {ok, Result, NextReadSeqId, State1} = + internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State), + true = ets:insert(Sequences, + {Q, NextReadSeqId, WriteSeqId, Remaining}), + {ok, + case Result of + {MsgId, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, + Remaining} + end, State1} + end. + +internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = NextReadSeqId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + ok = + if FakeDeliver orelse Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + NextReadSeqId, State1}; + true -> + {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} + end. + +internal_auto_ack(Q, State) -> + case internal_deliver(Q, false, true, State) of + {ok, empty, State1} -> {ok, State1}; + {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} -> + remove_messages(Q, [MsgSeqId], true, State1) + end. + +internal_ack(Q, MsgSeqIds, State) -> + remove_messages(Q, MsgSeqIds, true, State). + +%% Q is only needed if MnesiaDelete /= false +%% called from ack with MnesiaDelete = true +%% called from tx_commit with MnesiaDelete = txn +%% called from tx_cancel with MnesiaDelete = false +%% called from purge with MnesiaDelete = txn +%% called from delete_queue with MnesiaDelete = txn +remove_messages(Q, MsgSeqIds, MnesiaDelete, + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> + Files = + lists:foldl( + fun ({MsgId, SeqId}, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + Files3 = + if 1 =:= RefCount -> + ok = dets_ets_delete(State, MsgId), + [{File, ValidTotalSize, ContiguousTop, + Left, Right}] = ets:lookup(FileSummary, File), + ContiguousTop1 = + lists:min([ContiguousTop, Offset]), + true = + ets:insert(FileSummary, + {File, (ValidTotalSize-TotalSize- + ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files2; + true -> sets:add_element(File, Files2) + end; + 1 < RefCount -> + ok = dets_ets_insert( + State, {MsgId, RefCount - 1, + File, Offset, TotalSize}), + Files2 + end, + ok = if MnesiaDelete -> + mnesia:dirty_delete(rabbit_disk_queue, + {Q, SeqId}); + MnesiaDelete =:= txn -> + mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); + true -> ok + end, + Files3 + end, sets:new(), MsgSeqIds), + State2 = compact(Files, State), + {ok, State2}. + +internal_tx_publish(MsgId, MsgBody, + State = #dqstate { current_file_handle = CurHdl, + current_file_name = CurName, + current_offset = CurOffset, + file_summary = FileSummary + }) -> + case dets_ets_lookup(State, MsgId) of + [] -> + %% New message, lots to do + {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), + true = dets_ets_insert_new(State, {MsgId, 1, CurName, + CurOffset, TotalSize}), + [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = + ets:lookup(FileSummary, CurName), + ValidTotalSize1 = ValidTotalSize + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + ContiguousTop1 = if CurOffset =:= ContiguousTop -> + %% can't be any holes in this file + ValidTotalSize1; + true -> ContiguousTop + end, + true = ets:insert(FileSummary, {CurName, ValidTotalSize1, + ContiguousTop1, Left, undefined}), + NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + maybe_roll_to_new_file( + NextOffset, State #dqstate {current_offset = NextOffset, + current_dirty = true}); + [{MsgId, RefCount, File, Offset, TotalSize}] -> + %% We already know about it, just update counter + ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, + Offset, TotalSize}), + {ok, State} + end. + +%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) +internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, + State = #dqstate { current_file_handle = CurHdl, + current_file_name = CurName, + current_dirty = IsDirty, + sequences = Sequences + }) -> + {PubList, PubAcc, ReadSeqId, Length} = + case PubMsgSeqIds of + [] -> {[], undefined, undefined, undefined}; + [{_, FirstSeqIdTo}|_] -> + {InitReadSeqId, InitWriteSeqId, InitLength} = + sequence_lookup(Sequences, Q), + InitReadSeqId2 = determine_next_read_id( + InitReadSeqId, InitWriteSeqId, FirstSeqIdTo), + { zip_with_tail(PubMsgSeqIds, {last, {next, next}}), + InitWriteSeqId, InitReadSeqId2, InitLength} + end, + {atomic, {Sync, WriteSeqId, State2}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + %% must deal with publishes first, if we didn't + %% then we could end up acking a message before + %% it's been published, which is clearly + %% nonsense. I.e. in commit, do not do things in an + %% order which _could_not_ have happened. + {Sync2, WriteSeqId3} = + lists:foldl( + fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, + {Acc, ExpectedSeqId}) -> + [{MsgId, _RefCount, File, _Offset, + _TotalSize}] = dets_ets_lookup(State, MsgId), + SeqId2 = adjust_last_msg_seq_id( + Q, ExpectedSeqId, SeqId, write), + NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId), + ok = mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = + {Q, SeqId2}, + msg_id = MsgId, + is_delivered = false, + next_seq_id = NextSeqId2 + }, + write), + {Acc orelse (CurName =:= File), NextSeqId2} + end, {false, PubAcc}, PubList), + + {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), + {Sync2, WriteSeqId3, State3} + end), + true = if PubList =:= [] -> true; + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, + Length + erlang:length(PubList)}) + end, + IsDirty2 = if IsDirty andalso Sync -> + ok = file:sync(CurHdl), + false; + true -> IsDirty + end, + {ok, State2 #dqstate { current_dirty = IsDirty2 }}. + +%% SeqId can be 'next' +internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> + {ok, State1 = #dqstate { sequences = Sequences }} = + internal_tx_publish(MsgId, MsgBody, State), + {ReadSeqId, WriteSeqId, Length} = + sequence_lookup(Sequences, Q), + ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId), + WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty), + WriteSeqId3Next = WriteSeqId3 + 1, + ok = mnesia:dirty_write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, + msg_id = MsgId, + next_seq_id = WriteSeqId3Next, + is_delivered = IsDelivered}), + true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}), + {ok, {MsgId, WriteSeqId3}, State1}. + +internal_tx_cancel(MsgIds, State) -> + %% we don't need seq ids because we're not touching mnesia, + %% because seqids were never assigned + MsgSeqIds = zip_with_tail(MsgIds, {duplicate, undefined}), + remove_messages(undefined, MsgSeqIds, false, State). + +internal_requeue(_Q, [], State) -> + {ok, State}; +internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], + State = #dqstate { sequences = Sequences }) -> + %% We know that every seq_id in here is less than the ReadSeqId + %% you'll get if you look up this queue in Sequences (i.e. they've + %% already been delivered). We also know that the rows for these + %% messages are still in rabbit_disk_queue (i.e. they've not been + %% ack'd). + + %% Now, it would be nice if we could adjust the sequence ids in + %% rabbit_disk_queue (mnesia) to create a contiguous block and + %% then drop the ReadSeqId for the queue by the corresponding + %% amount. However, this is not safe because there may be other + %% sequence ids which have been sent out as part of deliveries + %% which are not being requeued. As such, moving things about in + %% rabbit_disk_queue _under_ the current ReadSeqId would result in + %% such sequence ids referring to the wrong messages. + + %% Therefore, the only solution is to take these messages, and to + %% reenqueue them at the top of the queue. Usefully, this only + %% affects the Sequences and rabbit_disk_queue structures - there + %% is no need to physically move the messages about on disk, so + %% MsgLocation and FileSummary stay put (which makes further sense + %% as they have no concept of sequence id anyway). + + {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), + ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), + MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}), + {atomic, {WriteSeqId2, Q}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl(fun requeue_message/2, {WriteSeqId, Q}, + MsgSeqIdsZipped) + end), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, + Length + erlang:length(MsgSeqIds)}), + {ok, State}. + +requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, + {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}}, + {ExpectedSeqIdTo, Q}) -> + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), + NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId, + next_seq_id = NextSeqIdOrig }] = + mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), + if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok; + true -> + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2}, + next_seq_id = NextSeqIdTo2, + is_delivered = NewIsDelivered + }, + write), + ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) + end, + {NextSeqIdTo2, Q}. + +internal_purge(Q, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {ok, 0, State}; + [{Q, ReadSeqId, WriteSeqId, _Length}] -> + {atomic, {ok, State2}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + {MsgSeqIds, WriteSeqId} = + rabbit_misc:unfold( + fun (SeqId) when SeqId == WriteSeqId -> false; + (SeqId) -> + [#dq_msg_loc { msg_id = MsgId, + next_seq_id = NextSeqId } + ] = mnesia:read(rabbit_disk_queue, + {Q, SeqId}, write), + {true, {MsgId, SeqId}, NextSeqId} + end, ReadSeqId), + remove_messages(Q, MsgSeqIds, txn, State) + end), + true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}), + {ok, WriteSeqId - ReadSeqId, State2} + end. + +internal_delete_queue(Q, State) -> + {ok, _Count, State1 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State), %% remove everything undelivered + true = ets:delete(Sequences, Q), + {atomic, {ok, State2}} = + mnesia:transaction( + fun() -> %% now remove everything already delivered + ok = mnesia:write_lock_table(rabbit_disk_queue), + Objs = + mnesia:match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + write), + MsgSeqIds = + lists:map( + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} end, Objs), + remove_messages(Q, MsgSeqIds, txn, State1) + end), + {ok, State2}. + +internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {[], State}; + [{Q, ReadSeq, WriteSeq, _Length}] -> + {QList, {WriteSeq, State3}} = + rabbit_misc:unfold( + fun ({SeqId, _State1}) when SeqId == WriteSeq -> + false; + ({SeqId, State1}) -> + {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, + NextReadSeqId, State2} = + internal_read_message(Q, SeqId, true, true, + State1), + {true, + {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId}, + {NextReadSeqId, State2}} + end, {ReadSeq, State}), + {lists:reverse(QList), State3} + end. + +internal_delete_non_durable_queues( + DurableQueues, State = #dqstate { sequences = Sequences }) -> + ets:foldl( + fun ({Q, _Read, _Write, _Length}, {ok, State1}) -> + case sets:is_element(Q, DurableQueues) of + true -> {ok, State1}; + false -> internal_delete_queue(Q, State1) + end + end, {ok, State}, Sequences). + +%% ---- ROLLING OVER THE APPEND FILE ---- + +maybe_roll_to_new_file(Offset, + State = #dqstate { file_size_limit = FileSizeLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_file_num = CurNum, + current_dirty = IsDirty, + file_summary = FileSummary + } + ) when Offset >= FileSizeLimit -> + ok = if IsDirty -> file:sync(CurHdl); + true -> ok + end, + ok = file:close(CurHdl), + NextNum = CurNum + 1, + NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, + {ok, NextHdl} = file:open(form_filename(NextName), + [write, raw, binary, delayed_write]), + ok = preallocate(NextHdl, FileSizeLimit, 0), + true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right + true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), + State1 = State #dqstate { current_file_name = NextName, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0, + current_dirty = false + }, + {ok, compact(sets:from_list([CurName]), State1)}; +maybe_roll_to_new_file(_, State) -> + {ok, State}. + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + ok. + +%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- + +compact(FilesSet, State) -> + %% smallest number, hence eldest, hence left-most, first + Files = lists:sort(sets:to_list(FilesSet)), + %% foldl reverses, so now youngest/right-most first + RemainingFiles = lists:foldl(fun (File, Acc) -> + delete_empty_files(File, Acc, State) + end, [], Files), + lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). + +combine_file(File, State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> + %% the file we're looking at may no longer exist as it may have + %% been deleted within the current GC run + case ets:lookup(FileSummary, File) of + [] -> State; + [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] -> + GoRight = + fun() -> + case Right of + undefined -> State; + _ when not (CurName == Right) -> + [RightObj] = ets:lookup(FileSummary, Right), + {_, State1} = + adjust_meta_and_combine(FileObj, RightObj, + State), + State1; + _ -> State + end + end, + case Left of + undefined -> + GoRight(); + _ -> [LeftObj] = ets:lookup(FileSummary, Left), + case adjust_meta_and_combine(LeftObj, FileObj, State) of + {true, State1} -> State1; + {false, State} -> GoRight() + end + end + end. + +adjust_meta_and_combine( + LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile}, + RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight}, + State = #dqstate { file_size_limit = FileSizeLimit, + file_summary = FileSummary + }) -> + TotalValidData = LeftValidData + RightValidData, + if FileSizeLimit >= TotalValidData -> + State1 = combine_files(RightObj, LeftObj, State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, RightRight, {4, LeftFile}), + true = ets:insert(FileSummary, {LeftFile, + TotalValidData, TotalValidData, + LeftLeft, + RightRight}), + true = ets:delete(FileSummary, RightFile), + {true, State1}; + true -> {false, State} + end. + +sort_msg_locations_by_offset(Asc, List) -> + Comp = if Asc -> fun erlang:'<'/2; + true -> fun erlang:'>'/2 + end, + lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + Comp(OffA, OffB) + end, List). + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), + ok = file:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +combine_files({Source, SourceValid, _SourceContiguousTop, + _SourceLeft, _SourceRight}, + {Destination, DestinationValid, DestinationContiguousTop, + _DestinationLeft, _DestinationRight}, + State1) -> + State = close_file(Source, close_file(Destination, State1)), + {ok, SourceHdl} = + file:open(form_filename(Source), + [read, write, raw, binary, read_ahead, delayed_write]), + {ok, DestinationHdl} = + file:open(form_filename(Destination), + [read, write, raw, binary, read_ahead, delayed_write]), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize); + true -> + Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = + file:open(form_filename(Tmp), + [read, write, raw, binary, + read_ahead, delayed_write]), + Worklist = + lists:dropwhile( + fun ({_, _, _, Offset, _}) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Destination, + '_', '_'}))), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and MsgLocationDets has been updated to + %% reflect compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file:position(TmpHdl, {bof, 0}), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file:sync(DestinationHdl), + ok = file:close(TmpHdl), + ok = file:delete(form_filename(Tmp)) + end, + SourceWorkList = + sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Source, + '_', '_'})), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file:sync(DestinationHdl), + ok = file:close(SourceHdl), + ok = file:close(DestinationHdl), + ok = file:delete(form_filename(Source)), + State. + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, State) -> + {FinalOffset, BlockStart2, BlockEnd2} = + lists:foldl( + fun ({MsgId, RefCount, _Source, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% update MsgLocationDets to reflect change of file and offset + ok = dets_ets_insert(State, {MsgId, RefCount, Destination, + CurOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next + %% msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for + %% the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(SourceHdl, {bof, BlockStart}), + {ok, BSize} = + file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {InitOffset, undefined, undefined}, WorkList), + %% do the last remaining block + BSize2 = BlockEnd2 - BlockStart2, + {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), + {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2), + ok. + +close_file(File, State = #dqstate { read_file_handles = + {ReadHdls, ReadHdlsAge} }) -> + case dict:find(File, ReadHdls) of + error -> + State; + {ok, {Hdl, Then}} -> + ok = file:close(Hdl), + State #dqstate { read_file_handles = + { dict:erase(File, ReadHdls), + gb_trees:delete(Then, ReadHdlsAge) } } + end. + +delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> + [{File, ValidData, _ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), + case ValidData of + %% we should NEVER find the current file in here hence right + %% should always be a file, not undefined + 0 -> + case {Left, Right} of + {undefined, _} when not (is_atom(Right)) -> + %% the eldest file is empty. YAY! + %% left is the 4th field + true = + ets:update_element(FileSummary, Right, {4, undefined}); + {_, _} when not (is_atom(Right)) -> + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, Left}), + %% right is the 5th field + true = ets:update_element(FileSummary, Left, {5, Right}) + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(File)), + Acc; + _ -> [File|Acc] + end. + +%% ---- DISK RECOVERY ---- + +add_index() -> + case mnesia:add_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end. + +del_index() -> + case mnesia:del_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's probably + %% not the end of the world + {aborted, {no_exists, rabbit_disk_queue,_}} -> ok; + E2 -> E2 + end. + +load_from_disk(State) -> + %% sorted so that smallest number is first. which also means + %% eldest file (left-most) first + ok = add_index(), + {Files, TmpFiles} = get_disk_queue_files(), + ok = recover_crashed_compactions(Files, TmpFiles), + %% There should be no more tmp files now, so go ahead and load the + %% whole lot + State1 = load_messages(undefined, Files, State), + %% Finally, check there is nothing in mnesia which we haven't + %% loaded + {atomic, true} = mnesia:transaction( + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = {Q, SeqId} }, + true) -> + case erlang:length(dets_ets_lookup( + State1, MsgId)) of + 0 -> ok == mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); + 1 -> true + end + end, + true, rabbit_disk_queue) + end), + State2 = extract_sequence_numbers(State1), + ok = del_index(), + {ok, State2}. + +extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> + {atomic, true} = mnesia:transaction( + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> true = + ets:insert_new(Sequences, + {Q, SeqId, NextWrite, -1}); + [Orig = {Q, Read, Write, Length}] -> + Repl = {Q, lists:min([Read, SeqId]), + %% Length is wrong here, but + %% it doesn't matter because + %% we'll pull out the gaps in + %% remove_gaps_in_sequences in + %% then do a straight + %% subtraction to get the + %% right length + lists:max([Write, NextWrite]), Length}, + if Orig /= Repl -> + true = ets:insert(Sequences, Repl); + true -> true + end + end + end, true, rabbit_disk_queue) + end), + remove_gaps_in_sequences(State), + State. + +remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> + %% read the comments at internal_requeue. + + %% Because we are at startup, we know that no sequence ids have + %% been issued (or at least, they were, but have been + %% forgotten). Therefore, we can nicely shuffle up and not + %% worry. Note that I'm choosing to shuffle up, but alternatively + %% we could shuffle downwards. However, I think there's greater + %% likelihood of gaps being at the bottom rather than the top of + %% the queue, so shuffling up should be the better bet. + {atomic, _} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + ReadSeqId2 = ReadSeqId + Gap, + Length = WriteSeqId - ReadSeqId2, + true = + ets:insert(Sequences, + {Q, ReadSeqId2, WriteSeqId, Length}) + end, ets:match_object(Sequences, '_')) + end). + +shuffle_up(_Q, SeqId, SeqId, Gap) -> + Gap; +shuffle_up(Q, BaseSeqId, SeqId, Gap) -> + GapInc = + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> 1; + [Obj] -> + if Gap =:= 0 -> ok; + true -> mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { + queue_and_seq_id = {Q, SeqId + Gap }, + next_seq_id = SeqId + Gap + 1 + }, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) + end, + 0 + end, + shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). + +load_messages(undefined, [], + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName }) -> + true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), + State; +load_messages(Left, [], State) -> + Num = list_to_integer(filename:rootname(Left)), + Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of + [] -> 0; + L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = + sort_msg_locations_by_offset(false, L), + MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + end, + State #dqstate { current_file_num = Num, current_file_name = Left, + current_offset = Offset }; +load_messages(Left, [File|Files], + State = #dqstate { file_summary = FileSummary }) -> + %% [{MsgId, TotalSize, FileOffset}] + {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), + {ValidMessagesRev, ValidTotalSize} = lists:foldl( + fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) of + 0 -> {VMAcc, VTSAcc}; + RefCount -> + true = + dets_ets_insert_new(State, {MsgId, RefCount, File, + Offset, TotalSize}), + {[{MsgId, TotalSize, Offset}|VMAcc], + VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT + } + end + end, {[], 0}, Messages), + %% foldl reverses lists and find_contiguous_block_prefix needs + %% elems in the same order as from scan_file_for_valid_messages + {ContiguousTop, _} = find_contiguous_block_prefix( + lists:reverse(ValidMessagesRev)), + Right = case Files of + [] -> undefined; + [F|_] -> F + end, + true = ets:insert_new(FileSummary, + {File, ValidTotalSize, ContiguousTop, Left, Right}), + load_messages(File, Files, State). + +%% ---- DISK RECOVERY OF FAILED COMPACTION ---- + +recover_crashed_compactions(Files, TmpFiles) -> + lists:foreach(fun (TmpFile) -> + ok = recover_crashed_compactions1(Files, TmpFile) end, + TmpFiles), + ok. + +verify_messages_in_mnesia(MsgIds) -> + lists:foreach( + fun (MsgId) -> + true = 0 < erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, + msg_id)) + end, MsgIds). + +recover_crashed_compactions1(Files, TmpFile) -> + GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, + NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, + true = lists:member(NonTmpRelatedFile, Files), + %% [{MsgId, TotalSize, FileOffset}] + {ok, UncorruptedMessagesTmp} = + scan_file_for_valid_messages(form_filename(TmpFile)), + MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), + %% all of these messages should appear in the mnesia table, + %% otherwise they wouldn't have been copied out + verify_messages_in_mnesia(MsgIdsTmp), + {ok, UncorruptedMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIds = lists:map(GrabMsgId, UncorruptedMessages), + %% 1) It's possible that everything in the tmp file is also in the + %% main file such that the main file is (prefix ++ + %% tmpfile). This means that compaction failed immediately + %% prior to the final step of deleting the tmp file. Plan: just + %% delete the tmp file + %% 2) It's possible that everything in the tmp file is also in the + %% main file but with holes throughout (or just somthing like + %% main = (prefix ++ hole ++ tmpfile)). This means that + %% compaction wrote out the tmp file successfully and then + %% failed. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 3) It's possible that everything in the tmp file is also in the + %% main file but such that the main file does not end with tmp + %% file (and there are valid messages in the suffix; main = + %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This + %% means that compaction failed as we were writing out the tmp + %% file. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 4) It's possible that there are messages in the tmp file which + %% are not in the main file. This means that writing out the + %% tmp file succeeded, but then we failed as we were copying + %% them back over to the main file, after truncating the main + %% file. As the main file has already been truncated, it should + %% consist only of valid messages. Plan: Truncate the main file + %% back to before any of the files in the tmp file and copy + %% them over again + case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file + %% is empty + ok = file:delete(TmpFile); + _False -> + %% we're in case 4 above. Check that everything in the + %% main file is a valid message in mnesia + verify_messages_in_mnesia(MsgIds), + %% The main file should be contiguous + {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), + %% we should have that none of the messages in the prefix + %% are in the tmp file + true = lists:all(fun (MsgId) -> + not (lists:member(MsgId, MsgIdsTmp)) + end, MsgIds), + {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), + [write, raw, binary, delayed_write]), + {ok, Top} = file:position(MainHdl, Top), + %% wipe out any rubbish at the end of the file + ok = file:truncate(MainHdl), + %% there really could be rubbish at the end of the file - + %% we could have failed after the extending truncate. + %% Remember the head of the list will be the highest entry + %% in the file + [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Top + TmpSize, + {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), + %% and now extend the main file as big as necessary in a + %% single move if we run out of disk space, this truncate + %% could fail, but we still aren't risking losing data + ok = file:truncate(MainHdl), + {ok, TmpHdl} = file:open(form_filename(TmpFile), + [read, raw, binary, read_ahead]), + {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:close(MainHdl), + ok = file:close(TmpHdl), + ok = file:delete(TmpFile), + + {ok, MainMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIdsMain = lists:map(GrabMsgId, MainMessages), + %% check that everything in MsgIds is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIds), + %% check that everything in MsgIdsTmp is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIdsTmp) + end, + ok. + +%% this assumes that the messages are ordered such that the highest +%% address is at the head of the list. This matches what +%% scan_file_for_valid_messages produces +find_contiguous_block_prefix([]) -> {0, []}; +find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> + case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of + {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + lists:reverse(Acc)}; + Res -> Res + end. +find_contiguous_block_prefix([], 0, Acc) -> + {ok, Acc}; +find_contiguous_block_prefix([], _N, _Acc) -> + {0, []}; +find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], + ExpectedOffset, Acc) + when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT -> + find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); +find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> + find_contiguous_block_prefix(List). + +file_name_sort(A, B) -> + ANum = list_to_integer(filename:rootname(A)), + BNum = list_to_integer(filename:rootname(B)), + ANum < BNum. + +get_disk_queue_files() -> + DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()), + DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles), + DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()), + DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles), + {DQFilesSorted, DQTFilesSorted}. + +%% ---- RAW READING AND WRITING OF FILES ---- + +append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) -> + BodySize = size(MsgBody), + MsgIdBin = term_to_binary(MsgId), + MsgIdBinSize = size(MsgIdBin), + TotalSize = BodySize + MsgIdBinSize, + case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdBin:MsgIdBinSize/binary, + MsgBody:BodySize/binary, + ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of + ok -> {ok, TotalSize}; + KO -> KO + end. + +read_message_at_offset(FileHdl, Offset, TotalSize) -> + TotalSizeWriteOkBytes = TotalSize + 1, + case file:position(FileHdl, {bof, Offset}) of + {ok, Offset} -> + case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of + {ok, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:TotalSizeWriteOkBytes/binary>>} -> + BodySize = TotalSize - MsgIdBinSize, + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, + {ok, {MsgBody, BodySize}}; + KO -> KO + end; + KO -> KO + end. + +scan_file_for_valid_messages(File) -> + {ok, Hdl} = file:open(File, [raw, binary, read]), + Valid = scan_file_for_valid_messages(Hdl, 0, []), + %% if something really bad's happened, the close could fail, but ignore + file:close(Hdl), + Valid. + +scan_file_for_valid_messages(FileHdl, Offset, Acc) -> + case read_next_file_entry(FileHdl, Offset) of + {ok, eof} -> {ok, Acc}; + {ok, {corrupted, NextOffset}} -> + scan_file_for_valid_messages(FileHdl, NextOffset, Acc); + {ok, {ok, MsgId, TotalSize, NextOffset}} -> + scan_file_for_valid_messages(FileHdl, NextOffset, + [{MsgId, TotalSize, Offset}|Acc]); + _KO -> + %% bad message, but we may still have recovered some valid messages + {ok, Acc} + end. + + +read_next_file_entry(FileHdl, Offset) -> + TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, + case file:read(FileHdl, TwoIntegers) of + {ok, + <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {TotalSize =:= 0, MsgIdBinSize =:= 0} of + {true, _} -> {ok, eof}; %% Nothing we can do other than stop + {false, true} -> + %% current message corrupted, try skipping past it + ExpectedAbsPos = + Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, + case file:position(FileHdl, {cur, TotalSize + 1}) of + {ok, ExpectedAbsPos} -> + {ok, {corrupted, ExpectedAbsPos}}; + {ok, _SomeOtherPos} -> + {ok, eof}; %% seek failed, so give up + KO -> KO + end; + {false, false} -> %% all good, let's continue + case file:read(FileHdl, MsgIdBinSize) of + {ok, <<MsgId:MsgIdBinSize/binary>>} -> + ExpectedAbsPos = Offset + TwoIntegers + TotalSize, + case file:position(FileHdl, + {cur, TotalSize - MsgIdBinSize} + ) of + {ok, ExpectedAbsPos} -> + NextOffset = Offset + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + case file:read(FileHdl, 1) of + {ok, + <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> + {ok, {ok, binary_to_term(MsgId), + TotalSize, NextOffset}}; + {ok, _SomeOtherData} -> + {ok, {corrupted, NextOffset}}; + KO -> KO + end; + {ok, _SomeOtherPos} -> + %% seek failed, so give up + {ok, eof}; + KO -> KO + end; + eof -> {ok, eof}; + KO -> KO + end + end; + eof -> {ok, eof}; + KO -> KO + end. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be00503..fe5acc83 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, rabbit_guid). -record(state, {serial}). @@ -59,17 +60,24 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(mnesia:system_info(directory), ?SERIAL_FILENAME), + Serial = case file:read_file(Filename) of + {ok, Content} -> + binary_to_term(Content); + {error, _} -> + 0 + end, + ok = file:write_file(Filename, term_to_binary(Serial + 1)), + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +85,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 72e16f0f..c965c693 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -52,6 +52,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2]). -import(mnesia). -import(lists). @@ -113,6 +114,7 @@ -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> [B]). -endif. @@ -431,3 +433,11 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl new file mode 100644 index 00000000..a950584a --- /dev/null +++ b/src/rabbit_mixed_queue.erl @@ -0,0 +1,394 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_mixed_queue). + +-include("rabbit.hrl"). + +-export([start_link/3]). + +-export([publish/2, publish_delivered/2, deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, + length/1, is_empty/1, delete_queue/1]). + +-export([to_disk_only_mode/1, to_mixed_mode/1]). + +-record(mqstate, { mode, + msg_buf, + next_write_seq, + queue, + is_durable + } + ). + +start_link(Queue, IsDurable, disk) -> + purge_non_persistent_messages( + #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, + next_write_seq = 0, is_durable = IsDurable }); +start_link(Queue, IsDurable, mixed) -> + {ok, State} = start_link(Queue, IsDurable, disk), + to_mixed_mode(State). + +to_disk_only_mode(State = #mqstate { mode = disk }) -> + {ok, State}; +to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + next_write_seq = NextSeq }) -> + rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), + %% We enqueue _everything_ here. This means that should a message + %% already be in the disk queue we must remove it and add it back + %% in. Fortunately, by using requeue, we avoid rewriting the + %% message on disk. + %% Note we also batch together messages on disk so that we minimise + %% the calls to requeue. + Msgs = queue:to_list(MsgBuf), + {NextSeq1, Requeue} = + lists:foldl( + fun ({_Seq, Msg = #basic_message { guid = MsgId }, + IsDelivered, OnDisk}, {NSeq, RQueueAcc}) -> + if OnDisk -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + {NSeq + 1, + [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]}; + true -> + ok = if [] == RQueueAcc -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(RQueueAcc)) + end, + ok = rabbit_disk_queue:publish_with_seq( + Q, MsgId, NSeq, msg_to_bin(Msg), false), + {NSeq + 1, []} + end + end, {NextSeq, []}, Msgs), + ok = if [] == Requeue -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) + end, + {ok, State #mqstate { mode = disk, msg_buf = queue:new(), + next_write_seq = NextSeq1 }}. + +to_mixed_mode(State = #mqstate { mode = mixed }) -> + {ok, State}; +to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) -> + rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), + %% load up a new queue with everything that's on disk. + %% don't remove non-persistent messages that happen to be on disk + QList = rabbit_disk_queue:dump_queue(Q), + {MsgBuf1, NextSeq1} = + lists:foldl( + fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq}) + when SeqId >= NSeq -> + Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), + {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId + 1} + end, {queue:new(), 0}, QList), + State1 = State #mqstate { mode = mixed, msg_buf = MsgBuf1, + next_write_seq = NextSeq1 }, + rabbit_log:info("Queue length: ~p ~w ~w~n", + [Q, rabbit_mixed_queue:length(State), + rabbit_mixed_queue:length(State1)]), + {ok, State1}. + +purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + %% iterate through the content on disk, ack anything which isn't + %% persistent, accumulate everything else that is persistent and + %% requeue it + NextSeq = rabbit_disk_queue:next_write_seq(Q), + {Acks, Requeue, NextSeq2} = + deliver_all_messages(Q, IsDurable, [], [], NextSeq), + ok = if Requeue == [] -> ok; + true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) + end, + ok = if Acks == [] -> ok; + true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks)) + end, + {ok, State #mqstate { next_write_seq = NextSeq2 }}. + +deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) -> + case rabbit_disk_queue:deliver(Q) of + empty -> {Acks, Requeue, NextSeq}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> + #basic_message { guid = MsgId, is_persistent = IsPersistent } = + bin_to_msg(MsgBin), + OnDisk = IsPersistent andalso IsDurable, + {Acks2, Requeue2, NextSeq2} = + if OnDisk -> {Acks, + [{AckTag, {NextSeq, IsDelivered}} | Requeue], + NextSeq + 1 + }; + true -> {[AckTag | Acks], Requeue, NextSeq} + end, + deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2) + end. + +msg_to_bin(Msg = #basic_message { content = Content }) -> + ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), + term_to_binary(Msg #basic_message { content = ClearedContent }). + +bin_to_msg(MsgBin) -> + binary_to_term(MsgBin). + +publish(Msg = #basic_message { guid = MsgId }, + State = #mqstate { mode = disk, queue = Q }) -> + ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), + {ok, State}; +publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, + next_write_seq = NextSeq, msg_buf = MsgBuf }) -> + OnDisk = IsDurable andalso IsPersistent, + ok = if OnDisk -> + rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, + msg_to_bin(Msg), false); + true -> ok + end, + {ok, State #mqstate { next_write_seq = NextSeq + 1, + msg_buf = queue:in({NextSeq, Msg, false, OnDisk}, + MsgBuf) + }}. + +%% Assumption here is that the queue is empty already (only called via +%% attempt_immediate_delivery). Also note that the seq id assigned by +%% the disk queue could well not be the same as the NextSeq (true = +%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't +%% matter because the AckTag will still be correct (AckTags for +%% non-persistent messages don't exist). (next_write_seq is actually +%% only used to calculate how many messages are in the queue). +publish_delivered(Msg = + #basic_message { guid = MsgId, is_persistent = IsPersistent}, + State = #mqstate { mode = Mode, is_durable = IsDurable, + next_write_seq = NextSeq, queue = Q }) + when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> + rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), + if IsDurable andalso IsPersistent -> + %% must call phantom_deliver otherwise the msg remains at + %% the head of the queue. This is synchronous, but + %% unavoidable as we need the AckTag + {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), + {ok, AckTag, State}; + true -> + %% in this case, we don't actually care about the ack, so + %% auto ack it (asynchronously). + ok = rabbit_disk_queue:auto_ack_next_message(Q), + {ok, noack, State #mqstate { next_write_seq = NextSeq + 1 }} + end; +publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> + true = queue:is_empty(MsgBuf), + {ok, noack, State}. + +deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> + case rabbit_disk_queue:deliver(Q) of + empty -> {empty, State}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} -> + #basic_message { guid = MsgId, is_persistent = IsPersistent } = + Msg = bin_to_msg(MsgBin), + AckTag2 = if IsPersistent andalso IsDurable -> AckTag; + true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), + noack + end, + {{Msg, IsDelivered, AckTag2, Remaining}, State} + end; + +deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, + next_write_seq = NextWrite, msg_buf = MsgBuf }) -> + {Result, MsgBuf2} = queue:out(MsgBuf), + case Result of + empty -> + {empty, State}; + {value, {Seq, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + IsDelivered, OnDisk}} -> + AckTag = + if OnDisk -> + if IsPersistent andalso IsDurable -> + {MsgId, IsDelivered, AckTag2, _PersistRem} = + rabbit_disk_queue:phantom_deliver(Q), + AckTag2; + true -> + ok = rabbit_disk_queue:auto_ack_next_message(Q), + noack + end; + true -> noack + end, + {{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)}, + State #mqstate { msg_buf = MsgBuf2 }} + end. + +remove_noacks(Acks) -> + lists:filter(fun (A) -> A /= noack end, Acks). + +ack(Acks, State = #mqstate { queue = Q }) -> + case remove_noacks(Acks) of + [] -> {ok, State}; + AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags), + {ok, State} + end. + +tx_publish(Msg = #basic_message { guid = MsgId }, + State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), + {ok, State}; +tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + State = #mqstate { mode = mixed, is_durable = IsDurable }) + when IsDurable andalso IsPersistent -> + ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), + {ok, State}; +tx_publish(_Msg, State = #mqstate { mode = mixed }) -> + %% this message will reappear in the tx_commit, so ignore for now + {ok, State}. + +only_msg_ids(Pubs) -> + lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). + +tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> + RealAcks = remove_noacks(Acks), + ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; + true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), + RealAcks) + end, + {ok, State}; +tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, + msg_buf = MsgBuf, + next_write_seq = NextSeq, + is_durable = IsDurable + }) -> + {PersistentPubs, MsgBuf2, NextSeq2} = + lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, + {Acc, MsgBuf3, NextSeq3}) -> + OnDisk = IsPersistent andalso IsDurable, + Acc2 = + if OnDisk -> + [{Msg #basic_message.guid, NextSeq3} + | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk}, + MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, Publishes), + %% foldl reverses, so re-reverse PersistentPubs to match + %% requirements of rabbit_disk_queue (ascending SeqIds) + RealAcks = remove_noacks(Acks), + ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; + true -> + rabbit_disk_queue:tx_commit_with_seqs( + Q, lists:reverse(PersistentPubs), RealAcks) + end, + {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. + +only_persistent_msg_ids(Pubs) -> + lists:reverse( + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + if IsPersistent -> [Msg #basic_message.guid | Acc]; + true -> Acc + end + end, [], Pubs)). + +tx_cancel(Publishes, State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), + {ok, State}; +tx_cancel(Publishes, + State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + ok = + if IsDurable -> + rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)); + true -> ok + end, + {ok, State}. + +%% [{Msg, AckTag}] +requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + %% here, we may have messages with no ack tags, because of the + %% fact they are not persistent, but nevertheless we want to + %% requeue them. This means publishing them delivered. + Requeue + = lists:foldl( + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + when IsPersistent andalso IsDurable -> + [AckTag | RQ]; + ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) -> + ok = if RQ == [] -> ok; + true -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) + end, + _AckTag2 = rabbit_disk_queue:publish( + Q, MsgId, msg_to_bin(Msg), true), + [] + end, [], MessagesWithAckTags), + ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), + {ok, State}; +requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, + msg_buf = MsgBuf, + next_write_seq = NextSeq, + is_durable = IsDurable + }) -> + {PersistentPubs, MsgBuf2, NextSeq2} = + lists:foldl( + fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, + {Acc, MsgBuf3, NextSeq3}) -> + OnDisk = IsDurable andalso IsPersistent, + Acc2 = + if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), + ok = if [] == PersistentPubs -> ok; + true -> rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(PersistentPubs)) + end, + {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. + +purge(State = #mqstate { queue = Q, mode = disk }) -> + Count = rabbit_disk_queue:purge(Q), + {Count, State}; +purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> + rabbit_disk_queue:purge(Q), + Count = queue:len(MsgBuf), + {Count, State #mqstate { msg_buf = queue:new() }}. + +delete_queue(State = #mqstate { queue = Q, mode = disk }) -> + rabbit_disk_queue:delete_queue(Q), + {ok, State}; +delete_queue(State = #mqstate { queue = Q, mode = mixed }) -> + rabbit_disk_queue:delete_queue(Q), + {ok, State #mqstate { msg_buf = queue:new() }}. + +length(#mqstate { queue = Q, mode = disk }) -> + rabbit_disk_queue:length(Q); +length(#mqstate { mode = mixed, msg_buf = MsgBuf }) -> + queue:len(MsgBuf). + +is_empty(State) -> + 0 == rabbit_mixed_queue:length(State). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0a..77e309fe 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -144,11 +144,26 @@ table_definitions() -> {disc_copies, [node()]}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}]}, + {rabbit_disk_queue, + [{record_name, dq_msg_loc}, + {type, set}, + {local_content, true}, + {attributes, record_info(fields, dq_msg_loc)}, + {disc_only_copies, [node()]}]} + ]. + +replicated_table_definitions() -> + [{Tab, Attrs} || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, _} <- replicated_table_definitions()]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -173,7 +188,7 @@ ensure_mnesia_not_running() -> check_schema_integrity() -> %%TODO: more thorough checks - case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of + case catch [mnesia:table_info(Tab, version) || Tab <- replicated_table_names()] of {'EXIT', Reason} -> {error, Reason}; _ -> ok end. @@ -253,9 +268,10 @@ init_db(ClusterNodes) -> WasDiskNode = mnesia:system_info(use_dir), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), - case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of + ExtraNodes = ClusterNodes -- [node()], + case mnesia:change_config(extra_db_nodes, ExtraNodes) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + if WasDiskNode -> case check_schema_integrity() of ok -> ok; @@ -270,14 +286,8 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + ok = create_schema() end; {ok, [_|_]} -> ok = wait_for_tables(), @@ -337,15 +347,19 @@ create_tables() -> ok. create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, + ok = create_local_table_copy(schema, disc_copies), lists:foreach( fun({Tab, TabDef}) -> HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), + case lists:keysearch(disc_copies, 1, TabDef) of + false -> false; + {value, {disc_copies, List1}} -> lists:member(node(), List1) + end, HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + case lists:keysearch(disc_only_copies, 1, TabDef) of + false -> false; + {value, {disc_only_copies, List2}} -> lists:member(node(), List2) + end, StorageType = case Type of disc -> @@ -367,9 +381,6 @@ create_local_table_copies(Type) -> ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -387,7 +398,7 @@ create_local_table_copy(Tab, Type) -> wait_for_tables() -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(replicated_table_names(), 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl deleted file mode 100644 index d0d60ddf..00000000 --- a/src/rabbit_persister.erl +++ /dev/null @@ -1,523 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_persister). - --behaviour(gen_server). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([transaction/1, extend_transaction/2, dirty_work/1, - commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). - --include("rabbit.hrl"). - --define(SERVER, ?MODULE). - --define(LOG_BUNDLE_DELAY, 5). --define(COMPLETE_BUNDLE_DELAY, 2). - --define(HIBERNATE_AFTER, 10000). - --define(MAX_WRAP_ENTRIES, 500). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). - --record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). - -%% two tables for efficient persistency -%% one maps a key to a message -%% the other maps a key to one or more queues. -%% The aim is to reduce the overload of storing a message multiple times -%% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(qmsg() :: {amqqueue(), pkey()}). --type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). - --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). --spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). --spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -transaction(MessageList) -> - ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). - -extend_transaction(TxnKey, MessageList) -> - ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), - gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). - -dirty_work(MessageList) -> - ?LOGDEBUG("dirty_work ~p~n", [MessageList]), - gen_server:cast(?SERVER, {dirty_work, MessageList}). - -commit_transaction(TxnKey) -> - ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). - -rollback_transaction(TxnKey) -> - ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), - gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). - -force_snapshot() -> - gen_server:call(?SERVER, force_snapshot, infinity). - -serial() -> - gen_server:call(?SERVER, serial, infinity). - -%%-------------------------------------------------------------------- - -init(_Args) -> - process_flag(trap_exit, true), - FileName = base_filename(), - ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), - messages = ets:new(messages, []), - queues = ets:new(queues, [])}, - LogHandle = - case disk_log:open([{name, rabbit_persister}, - {head, current_snapshot(Snapshot)}, - {file, FileName}]) of - {ok, LH} -> LH; - {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> - WarningFun = if - Bad > 0 -> fun rabbit_log:warning/2; - true -> fun rabbit_log:info/2 - end, - WarningFun("Repaired persister log - ~p recovered, ~p bad~n", - [Recovered, Bad]), - LH - end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, - case Res of - ok -> - ok = take_snapshot(LogHandle, NewSnapshot); - {error, Reason} -> - rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), - ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) - end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, - {ok, State}. - -handle_call({transaction, Key, MessageList}, From, State) -> - NewState = internal_extend(Key, MessageList, State), - do_noreply(internal_commit(From, Key, NewState)); -handle_call({commit_transaction, TxnKey}, From, State) -> - do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> - do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({rollback_transaction, TxnKey}, State) -> - do_noreply(internal_rollback(TxnKey, State)); -handle_cast({dirty_work, MessageList}, State) -> - do_noreply(internal_dirty_work(MessageList, State)); -handle_cast({extend_transaction, TxnKey, MessageList}, State) -> - do_noreply(internal_extend(TxnKey, MessageList, State)); -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(timeout, State = #pstate{deadline = infinity}) -> - State1 = flush(true, State), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State1, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); -handle_info(timeout, State) -> - do_noreply(flush(State)); -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> - flush(State), - disk_log:close(LogHandle), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, flush(State)}. - -%%-------------------------------------------------------------------- - -internal_extend(Key, MessageList, State) -> - log_work(fun (ML) -> {extend_transaction, Key, ML} end, - MessageList, State). - -internal_dirty_work(MessageList, State) -> - log_work(fun (ML) -> {dirty_work, ML} end, - MessageList, State). - -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {commit_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - complete(From, Unit, State#pstate{snapshot = NewSnapshot}). - -internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {rollback_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -complete(From, Item, State = #pstate{deadline = ExistingDeadline, - pending_logs = Logs, - pending_replies = Waiting}) -> - State#pstate{deadline = compute_deadline( - ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), - pending_logs = [Item | Logs], - pending_replies = [From | Waiting]}. - -%% This is made to limit disk usage by writing messages only once onto -%% disk. We keep a table associating pkeys to messages, and provided -%% the list of messages to output is left to right, we can guarantee -%% that pkeys will be a backreference to a message in memory when a -%% "tied" is met. -log_work(CreateWorkUnit, MessageList, - State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> - Unit = CreateWorkUnit( - rabbit_misc:map_in_order( - fun(M = {publish, Message, QK = {_QName, PKey}}) -> - case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; - [] -> ets:insert(Messages, {PKey, Message}), - M - end; - (M) -> M - end, - MessageList)), - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, - Message) -> - State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, - ExistingDeadline), - pending_logs = [Message | Logs]}. - -base_filename() -> - rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". - -take_snapshot(LogHandle, OldFileName, Snapshot) -> - ok = disk_log:sync(LogHandle), - %% current_snapshot is the Head (ie. first thing logged) - ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). - -take_snapshot(LogHandle, Snapshot) -> - OldFileName = lists:flatten(base_filename() ++ ".previous"), - file:delete(OldFileName), - rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -take_snapshot_and_save_old(LogHandle, Snapshot) -> - {MegaSecs, Secs, MicroSecs} = erlang:now(), - Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, - OldFileName = lists:flatten(io_lib:format("~s.saved.~p", - [base_filename(), Timestamp])), - rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, - log_handle = LH, - snapshot = Snapshot}) - when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; -maybe_take_snapshot(_Force, State) -> - State. - -later_ms(DeltaMilliSec) -> - {MegaSec, Sec, MicroSec} = now(), - %% Note: not normalised. Unimportant for this application. - {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. - -%% Result = B - A, more or less -time_diff({B1, B2, B3}, {A1, A2, A3}) -> - (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . - -compute_deadline(TimerDelay, infinity) -> - later_ms(TimerDelay); -compute_deadline(_TimerDelay, ExistingDeadline) -> - ExistingDeadline. - -compute_timeout(infinity) -> - ?HIBERNATE_AFTER; -compute_timeout(Deadline) -> - DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, - if - DeltaMilliSec =< 1 -> - 0; - true -> - round(DeltaMilliSec) - end. - -do_noreply(State = #pstate{deadline = Deadline}) -> - {noreply, State, compute_timeout(Deadline)}. - -do_reply(Reply, State = #pstate{deadline = Deadline}) -> - {reply, Reply, State, compute_timeout(Deadline)}. - -flush(State) -> flush(false, State). - -flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if PendingLogs /= [] -> - disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - State#pstate{entry_count = State#pstate.entry_count + 1}; - true -> - State - end, - State2 = maybe_take_snapshot(ForceSnapshot, State1), - if Waiting /= [] -> - ok = disk_log:sync(LogHandle), - lists:foreach(fun (From) -> gen_server:reply(From, ok) end, - Waiting); - true -> - ok - end, - State2#pstate{deadline = infinity, - pending_logs = [], - pending_replies = []}. - -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> - %% Avoid infinite growth of the table by removing messages not - %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, - {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, - ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), - {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - term_to_binary(InnerSnapshot)}. - -prune_table(Tab, Keys) -> - true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), - true = ets:safe_fixtable(Tab, false). - -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of - true -> ok; - false -> ets:delete(Tab, Key) - end, - prune_table(Tab, Keys, ets:next(Tab, Key)). - -internal_load_snapshot(LogHandle, - Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), - case check_version(Loaded_Snapshot) of - {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), - true = ets:insert(Messages, Ms), - true = ets:insert(Queues, Qs), - Snapshot1 = replay(Items, LogHandle, K, - Snapshot#psnapshot{ - serial = Serial, - transactions = Ts}), - Snapshot2 = requeue_messages(Snapshot1), - %% uncompleted transactions are discarded - this is TRTTD - %% since we only get into this code on node restart, so - %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, Snapshot} - end. - -check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - StateBin}) -> - {ok, StateBin}; -check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> - {error, {unsupported_persister_log_format, Vsn}}; -check_version(_Other) -> - {error, unrecognised_persister_log_format}. - -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - -replay([], LogHandle, K, Snapshot) -> - case disk_log:chunk(LogHandle, K) of - {K1, Items} -> - replay(Items, LogHandle, K1, Snapshot); - {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", - [Badbytes]), - replay(Items, LogHandle, K1, Snapshot); - eof -> Snapshot - end; -replay([Item | Items], LogHandle, K, Snapshot) -> - NewSnapshot = internal_integrate_messages(Item, Snapshot), - replay(Items, LogHandle, K, NewSnapshot). - -internal_integrate_messages(Items, Snapshot) -> - lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, - Snapshot, Items). - -internal_integrate1({extend_transaction, Key, MessageList}, - Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; -internal_integrate1({rollback_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions}) -> - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; -internal_integrate1({commit_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> - case dict:find(Key, Transactions) of - {ok, MessageLists} -> - ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; - error -> - Snapshot - end; -internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl new file mode 100644 index 00000000..080607bb --- /dev/null +++ b/src/rabbit_queue_mode_manager.erl @@ -0,0 +1,105 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_mode_manager). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/1, change_memory_usage/2]). + +-define(SERVER, ?MODULE). + +-record(state, { mode, + queues + }). + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +register(Pid) -> + gen_server2:call(?SERVER, {register, Pid}). + +change_memory_usage(_Pid, Conserve) -> + gen_server2:cast(?SERVER, {change_memory_usage, Conserve}). + +init([]) -> + process_flag(trap_exit, true), + ok = rabbit_alarm:register(self(), {?MODULE, change_memory_usage, []}), + {ok, #state { mode = unlimited, + queues = [] + }}. + +handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) -> + Result = case Mode of + unlimited -> mixed; + _ -> disk + end, + {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}. + +handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) -> + {noreply, State}; +handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) -> + constrain_queues(true, State #state.queues), + {noreply, State #state { mode = disk_only }}; +handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) -> + ok = rabbit_disk_queue:to_disk_only_mode(), + {noreply, State #state { mode = ram_disk }}; + +handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) -> + {noreply, State}; +handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) -> + ok = rabbit_disk_queue:to_ram_disk_mode(), + {noreply, State #state { mode = unlimited }}; +handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) -> + constrain_queues(false, State #state.queues), + {noreply, State #state { mode = ram_disk }}. + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +constrain_queues(Constrain, Qs) -> + lists:foreach( + fun (QPid) -> + ok = rabbit_amqqueue:constrain_memory(QPid, Constrain) + end, Qs). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8f0a3a89..a2a31a18 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -31,7 +31,9 @@ -module(rabbit_tests). --export([all_tests/0, test_parsing/0]). +-compile(export_all). + +-export([all_tests/0, test_parsing/0, test_disk_queue/0]). -import(lists). @@ -54,6 +56,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_disk_queue(), passed. test_priority_queue() -> @@ -261,7 +264,7 @@ test_log_management() -> %% original log files are not writable ok = make_files_non_writable([MainLog, SaslLog]), {error, {{cannot_rotate_main_logs, _}, - {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), %% logging directed to tty (handlers were removed in last test) ok = clean_logs([MainLog, SaslLog], Suffix), @@ -280,7 +283,7 @@ test_log_management() -> ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, - {rabbit_sasl_report_file_h, SaslLog}]), + {rabbit_sasl_report_file_h, SaslLog}]), passed. test_log_management_during_startup() -> @@ -404,19 +407,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -432,11 +433,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -449,21 +451,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -684,3 +686,344 @@ delete_log_handlers(Handlers) -> [[] = error_logger:delete_report_handler(Handler) || Handler <- Handlers], ok. + +test_disk_queue() -> + rdq_stop(), + rdq_virgin(), + passed = rdq_stress_gc(10000), + passed = rdq_test_startup_with_queue_gaps(), + passed = rdq_test_redeliver(), + passed = rdq_test_purge(), + passed = rdq_test_dump_queue(), + passed = rdq_test_mixed_queue_modes(), + rdq_virgin(), + ok = control_action(stop_app, []), + ok = control_action(start_app, []), + passed. + +benchmark_disk_queue() -> + rdq_stop(), + % unicode chars are supported properly from r13 onwards + io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), + [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), + timer:sleep(1000) end || % 1000 milliseconds + MsgSize <- [512, 8192, 32768, 131072], + Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)], + MsgCount <- [1024, 4096, 16384] + ], + rdq_virgin(), + ok = control_action(stop_app, []), + ok = control_action(start_app, []), + passed. + +rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> + Startup = rdq_virgin(), + rdq_start(), + QCount = length(Qs), + Msg = <<0:(8*MsgSizeBytes)>>, + List = lists:seq(1, MsgCount), + {Publish, ok} = + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) + || N <- List, _ <- Qs] end, + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) + || Q <- Qs] end + ]]), + {Deliver, ok} = + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [begin SeqIds = + [begin + Remaining = MsgCount - N, + {N, Msg, MsgSizeBytes, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(Q), + SeqId + end || N <- List], + ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) + end || Q <- Qs] + end]]), + io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", + [MsgCount, MsgSizeBytes, QCount, float(Startup), + float(Publish), (Publish / (MsgCount * QCount)), + (Publish / (MsgCount * QCount * MsgSizeBytes)), + float(Deliver), (Deliver / (MsgCount * QCount)), + (Deliver / (MsgCount * QCount * MsgSizeBytes))]), + rdq_stop(). + +% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have +% several files, and then keep punching holes in a reasonably sensible way. +rdq_stress_gc(MsgCount) -> + rdq_virgin(), + rdq_start(), + MsgSizeBytes = 256*1024, + Msg = <<0:(8*MsgSizeBytes)>>, % 256KB + List = lists:seq(1, MsgCount), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], + rabbit_disk_queue:tx_commit(q, List, []), + StartChunk = round(MsgCount / 20), % 5% + AckList = + lists:foldl( + fun (E, Acc) -> + case lists:member(E, Acc) of + true -> Acc; + false -> [E|Acc] + end + end, [], lists:flatten( + lists:reverse( + [ lists:seq(N, MsgCount, N) + || N <- lists:seq(1, round(MsgCount / 2), 1) + ]))), + {Start, End} = lists:split(StartChunk, AckList), + AckList2 = End ++ Start, + MsgIdToSeqDict = + lists:foldl( + fun (MsgId, Acc) -> + Remaining = MsgCount - MsgId, + {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), + dict:store(MsgId, SeqId, Acc) + end, dict:new(), List), + %% we really do want to ack each of this individually + [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), + rabbit_disk_queue:ack(q, [SeqId]) + end || MsgId <- AckList2], + rabbit_disk_queue:tx_commit(q, [], []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_startup_with_queue_gaps() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% ack every other message we have delivered (starting at the _first_) + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:ack(q, [SeqId2]), + false; + (_SeqId2, false) -> + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Acked every other message delivered done~n", []), + rdq_stop(), + rdq_start(), + io:format("Startup (with shuffle) done~n", []), + %% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered + Seqs2 = [begin + Remaining = round(Total - ((Half + N)/2)), + {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(2,Half,2)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + io:format("Reread non-acked messages done~n", []), + %% and now fetch the rest + Seqs3 = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1 + Half,Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + io:format("Read second half done~n", []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_redeliver() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% now requeue every other message (starting at the _first_) + %% and ack the other ones + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:requeue(q, [SeqId2]), + false; + (SeqId2, false) -> + rabbit_disk_queue:ack(q, [SeqId2]), + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Redeliver and acking done~n", []), + %% we should now get the 2nd half in order, followed by every-other-from-the-first-half + Seqs2 = [begin + Remaining = round(Total - N + (Half/2)), + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1+Half, Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + Seqs3 = [begin + Remaining = round((Half - N) / 2) - 1, + {N, Msg, 256, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1, Half, 2)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_purge() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + rabbit_disk_queue:purge(q), + io:format("Purge done~n", []), + rabbit_disk_queue:tx_commit(q, [], Seqs), + io:format("Ack first half done~n", []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + +rdq_test_dump_queue() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All], + QList = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok undelivered~n", []), + rdq_start(), + lists:foreach( + fun (N) -> + Remaining = Total - N, + {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q) + end, All), + [] = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok post delivery~n", []), + rdq_start(), + QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All], + QList2 = rabbit_disk_queue:dump_queue(q), + io:format("dump ok post delivery + restart~n", []), + rdq_stop(), + passed. + +rdq_test_mixed_queue_modes() -> + rdq_virgin(), + rdq_start(), + Payload = <<0:(8*256)>>, + {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed), + MS2 = lists:foldl(fun (_N, MS1) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS, lists:seq(1,10)), + MS4 = lists:foldl(fun (_N, MS3) -> + Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload)) + #basic_message { is_persistent = true }, + {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), + MS3a + end, MS2, lists:seq(1,10)), + MS6 = lists:foldl(fun (_N, MS5) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), + MS5a + end, MS4, lists:seq(1,10)), + 30 = rabbit_mixed_queue:length(MS6), + io:format("Published a mixture of messages~n"), + {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), + 30 = rabbit_mixed_queue:length(MS7), + io:format("Converted to disk only mode~n"), + {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7), + 30 = rabbit_mixed_queue:length(MS8), + io:format("Converted to mixed mode~n"), + MS10 = + lists:foldl( + fun (N, MS9) -> + Rem = 30 - N, + {{#basic_message { is_persistent = false }, + false, _AckTag, Rem}, + MS9a} = rabbit_mixed_queue:deliver(MS9), + MS9a + end, MS8, lists:seq(1,10)), + 20 = rabbit_mixed_queue:length(MS10), + io:format("Delivered initial non persistent messages~n"), + {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10), + 20 = rabbit_mixed_queue:length(MS11), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed), + 10 = rabbit_mixed_queue:length(MS12), + io:format("Recovered queue~n"), + {MS14, AckTags} = + lists:foldl( + fun (N, {MS13, AcksAcc}) -> + Rem = 10 - N, + {{#basic_message { is_persistent = true }, + false, AckTag, Rem}, + MS13a} = rabbit_mixed_queue:deliver(MS13), + {MS13a, [AckTag | AcksAcc]} + end, {MS12, []}, lists:seq(1,10)), + 0 = rabbit_mixed_queue:length(MS14), + {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), + io:format("Delivered and acked all messages~n"), + {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15), + 0 = rabbit_mixed_queue:length(MS16), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed), + 0 = rabbit_mixed_queue:length(MS17), + io:format("Recovered queue~n"), + rdq_stop(), + passed. + +rdq_time_commands(Funcs) -> + lists:foreach(fun (F) -> F() end, Funcs). + +rdq_virgin() -> + {Micros, {ok, _}} = + timer:tc(rabbit_disk_queue, start_link, []), + ok = rabbit_disk_queue:stop_and_obliterate(), + timer:sleep(1000), + Micros. + +rdq_start() -> + {ok, _} = rabbit_disk_queue:start_link(), + ok = rabbit_disk_queue:to_ram_disk_mode(), + ok. + +rdq_stop() -> + rabbit_disk_queue:stop(), + timer:sleep(1000). |