diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-25 11:08:44 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-25 11:08:44 +0100 |
commit | 2f16487b09e30bd5b3c3b32f69684648b64de2bf (patch) | |
tree | ffb9c2926a8581b88878574d17565cb803c4152c | |
parent | 1d2e94174aaf0339315b2a1fe8d701d877776a95 (diff) | |
download | rabbitmq-server-bug22752.tar.gz |
refactor: more sensible and consistent arg & field orderbug22752
-rw-r--r-- | src/rabbit_invariable_queue.erl | 95 |
1 files changed, 46 insertions, 49 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 6e1d33f9..a7ca20c8 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,7 +43,7 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack, durable }). +-record(iv_state, { queue, qname, durable, len, pending_ack }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). @@ -66,20 +66,23 @@ init(QName, IsDurable, Recover) -> true -> rabbit_persister:queue_content(QName); false -> [] end), - #iv_state { queue = Q, qname = QName, len = queue:len(Q), - pending_ack = dict:new(), durable = IsDurable }. + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), + pending_ack = dict:new() }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA, - durable = IsDurable }) -> - ok = persist_acks(IsDurable, none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName, - durable = IsDurable }) -> +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -87,47 +90,48 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName, Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(IsDurable, QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(IsDurable, none, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len, - durable = IsDurable }) -> - ok = persist_message(IsDurable, none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, pending_ack = PA, - len = 0, durable = IsDurable }) -> - ok = persist_message(IsDurable, none, QName, Msg), - ok = persist_delivery(IsDurable, QName, Msg, false), + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, len = Len, pending_ack = PA, - qname = QName, durable = IsDurable }) -> +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, + pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(IsDurable, QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(IsDurable, none, QName, + false -> ok = persist_acks(QName, IsDurable, none, [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA, - durable = IsDurable }) -> - ok = persist_acks(IsDurable, none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. @@ -135,14 +139,14 @@ tx_publish(Txn, Msg, State = #iv_state { qname = QName, durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(IsDurable, Txn, QName, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA, - durable = IsDurable }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(IsDurable, Txn, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -235,40 +239,33 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(false, _Txn, _QName, _Msg) -> - ok; -persist_message(_IsDurable, _Txn, _QName, - #basic_message { is_persistent = false }) -> - ok; -persist_message(_IsDurable, Txn, QName, Msg) -> +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties, %% rebuild from properties_bin on restore content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. -persist_delivery(false, _QName, _Msg, _IsDelivered) -> - ok; -persist_delivery(_IsDurable, _QName, #basic_message { is_persistent = false }, - _IsDelivered) -> - ok; -persist_delivery(_IsDurable, _QName, _Message, true) -> - ok; -persist_delivery(_IsDurable, QName, #basic_message { guid = Guid }, - _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. -persist_acks(false, _Txn, _QName, _AckTags, _PA) -> - ok; -persist_acks(_IsDurable, Txn, QName, AckTags, PA) -> +persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin {ok, Msg} = dict:find(Guid, PA), Msg #basic_message.is_persistent - end]). + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. persist_work(_Txn,_QName, []) -> ok; |