summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-25 11:08:44 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-25 11:08:44 +0100
commit2f16487b09e30bd5b3c3b32f69684648b64de2bf (patch)
treeffb9c2926a8581b88878574d17565cb803c4152c
parent1d2e94174aaf0339315b2a1fe8d701d877776a95 (diff)
downloadrabbitmq-server-bug22752.tar.gz
refactor: more sensible and consistent arg & field orderbug22752
-rw-r--r--src/rabbit_invariable_queue.erl95
1 files changed, 46 insertions, 49 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 6e1d33f9..a7ca20c8 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack, durable }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -66,20 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
- pending_ack = dict:new(), durable = IsDurable }.
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
+ pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA,
- durable = IsDurable }) ->
- ok = persist_acks(IsDurable, none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName,
- durable = IsDurable }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -87,47 +90,48 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName,
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(IsDurable, QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(IsDurable, none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len,
- durable = IsDurable }) ->
- ok = persist_message(IsDurable, none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, pending_ack = PA,
- len = 0, durable = IsDurable }) ->
- ok = persist_message(IsDurable, none, QName, Msg),
- ok = persist_delivery(IsDurable, QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, len = Len, pending_ack = PA,
- qname = QName, durable = IsDurable }) ->
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
+ pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(IsDurable, QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(IsDurable, none, QName,
+ false -> ok = persist_acks(QName, IsDurable, none,
[Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA,
- durable = IsDurable }) ->
- ok = persist_acks(IsDurable, none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
@@ -135,14 +139,14 @@ tx_publish(Txn, Msg, State = #iv_state { qname = QName,
durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(IsDurable, Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA,
- durable = IsDurable }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(IsDurable, Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -235,40 +239,33 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(false, _Txn, _QName, _Msg) ->
- ok;
-persist_message(_IsDurable, _Txn, _QName,
- #basic_message { is_persistent = false }) ->
- ok;
-persist_message(_IsDurable, Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(false, _QName, _Msg, _IsDelivered) ->
- ok;
-persist_delivery(_IsDurable, _QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_IsDurable, _QName, _Message, true) ->
- ok;
-persist_delivery(_IsDurable, QName, #basic_message { guid = Guid },
- _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(false, _Txn, _QName, _AckTags, _PA) ->
- ok;
-persist_acks(_IsDurable, Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;