summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-13 13:39:54 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-13 13:39:54 +0100
commit0dfbe573225e7e29e6acae7c2b8fe384e5ea41aa (patch)
tree6ea59fb00d050f732f2dd93d78f8b7aade33888d
parenta1ef8d7f672ab940b99dfc406aa2bb3f33b992fd (diff)
parent2ed4c29c8bc278b817b82747d28fc86a46b80f3d (diff)
downloadrabbitmq-server-0dfbe573225e7e29e6acae7c2b8fe384e5ea41aa.tar.gz
Merge bug26337 (again)
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl64
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_mnesia.erl30
-rw-r--r--src/rabbit_policies.erl11
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_recovery_terms.erl5
-rw-r--r--src/rabbit_variable_queue.erl72
11 files changed, 125 insertions, 79 deletions
diff --git a/Makefile b/Makefile
index 6dbb650e..ffb4cdfe 100644
--- a/Makefile
+++ b/Makefile
@@ -127,7 +127,7 @@ plugins:
# Not building plugins
check-xref:
- $(info xref checks are disabled)
+ $(info xref checks are disabled as there is no plugins-src directory)
endif
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index afc46e8e..eb3c7ef3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1210,6 +1210,14 @@
<listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>message_bytes_ready</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_unacknowledged</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>message_bytes_ram</term>
<listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem>
</varlistentry>
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 9ce9ec84..4b8af870 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -360,7 +360,7 @@ stop() ->
undefined -> ok;
_ -> await_startup(true)
end,
- rabbit_log:info("Stopping RabbitMQ~n"),
+ rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []),
Apps = ?APPS ++ rabbit_plugins:active(),
stop_apps(app_utils:app_dependency_order(Apps, true)).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4e23dbd2..a33a8fcc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -461,7 +461,8 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba1517af..db297c1d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
dlx,
dlx_routing_key,
max_length,
+ max_bytes,
args_policy_version,
status
}).
@@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
- {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2},
+ {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -304,6 +306,10 @@ init_max_length(MaxLen, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
State1.
+init_max_bytes(MaxBytes, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
+ State1.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
- case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
end
end.
-maybe_drop_head(State = #q{max_length = undefined}) ->
- {0, State};
-maybe_drop_head(State = #q{max_length = MaxLen,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ:len(BQS) - MaxLen of
- Excess when Excess > 0 ->
- {Excess,
- with_dlx(
- State#q.dlx,
- fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
- fun () ->
- {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
- BQ:drop(false, BQS0)
- end, {ok, BQS},
- lists:seq(1, Excess)),
- State#q{backing_queue_state = BQS1}
- end)};
- _ -> {0, State}
+maybe_drop_head(State = #q{max_length = undefined,
+ max_bytes = undefined}) ->
+ {false, State};
+maybe_drop_head(State) ->
+ maybe_drop_head(false, State).
+
+maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case over_max_length(State) of
+ true ->
+ maybe_drop_head(true,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msg(X, State) end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end));
+ false ->
+ {AlreadyDropped, State}
end.
+over_max_length(#q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
WasEmpty = BQ:is_empty(BQS),
@@ -726,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
-dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS) ->
- lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
- {{Msg, _, AckTag}, BQS1} =
- BQ:fetch(true, BQS0),
- {ok, DLFun(Msg, AckTag, Acc0), BQS1}
- end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ {ok, DLFun(Msg, AckTag, Acc), BQS1}
end, maxlen, X, State),
State1.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 595a05d3..098f5f43 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -20,8 +20,9 @@
-define(INFO_KEYS, [messages_ram, messages_ready_ram,
messages_unacknowledged_ram, messages_persistent,
- message_bytes, message_bytes_ram, message_bytes_persistent,
- backing_queue_status]).
+ message_bytes, message_bytes_ready,
+ message_bytes_unacknowledged, message_bytes_ram,
+ message_bytes_persistent, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index e60388f0..a499686f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -133,15 +133,16 @@ init_from_config() ->
end.
auto_cluster(TryNodes, NodeType) ->
- case find_good_node(nodes_excl_me(TryNodes)) of
+ case find_auto_cluster_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
- rabbit_log:warning("Could not find any node for auto-clustering "
- "from: ~p~n", [TryNodes]),
+ rabbit_log:warning(
+ "Could not find any node for auto-clustering from: ~p~n"
+ "Starting blank node...~n", [TryNodes]),
init_db_and_upgrade([node()], disc, false)
end.
@@ -792,17 +793,24 @@ is_virgin_node() ->
false
end.
-find_good_node([]) ->
+find_auto_cluster_node([]) ->
none;
-find_good_node([Node | Nodes]) ->
+find_auto_cluster_node([Node | Nodes]) ->
+ Fail = fun (Fmt, Args) ->
+ rabbit_log:warning(
+ "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]),
+ find_auto_cluster_node(Nodes)
+ end,
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
+ {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]),
+ Fail("~p~n~s~n", [Reason, Diag]);
%% old delegate hash check
- {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
- {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, Rabbit}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 3558cf98..cc88765f 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -34,7 +34,8 @@ register() ->
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"message-ttl">>},
{policy_validator, <<"expires">>},
- {policy_validator, <<"max-length">>}]],
+ {policy_validator, <<"max-length">>},
+ {policy_validator, <<"max-length-bytes">>}]],
ok.
validate_policy(Terms) ->
@@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"max-length">>, Value) ->
- {error, "~p is not a valid maximum length", [Value]}.
-
+ {error, "~p is not a valid maximum length", [Value]};
+validate_policy0(<<"max-length-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-length-bytes">>, Value) ->
+ {error, "~p is not a valid maximum length in bytes", [Value]}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2cdd54a7..b2930f88 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -873,7 +873,7 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch throw:{inet_error, closed} ->
+ catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
maybe_emit_stats(State),
throw(connection_closed_abruptly);
exit:#amqp_error{method = none} = Reason ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index f169e13d..9f837cce 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -61,7 +61,7 @@ read(DirBaseName) ->
end.
clear() ->
- dets:delete_all_objects(?MODULE),
+ ok = dets:delete_all_objects(?MODULE),
flush().
start_link() -> gen_server:start_link(?MODULE, [], []).
@@ -131,9 +131,8 @@ open_table() ->
{ram_file, true},
{auto_save, infinity}]).
-flush() -> dets:sync(?MODULE).
+flush() -> ok = dets:sync(?MODULE).
close_table() ->
ok = flush(),
ok = dets:close(?MODULE).
-
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c52862ab..e97ed491 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -255,7 +255,8 @@
transient_threshold,
len, %% w/o unacked
- bytes, %% w unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
persistent_count, %% w unacked
persistent_bytes, %% w unacked
@@ -347,6 +348,8 @@
len :: non_neg_integer(),
bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
persistent_count :: non_neg_integer(),
persistent_bytes :: non_neg_integer(),
@@ -508,14 +511,13 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- bytes = Bytes,
ram_bytes = RamBytes,
persistent_count = PCount,
persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {Bytes, RamBytes, PCount, PBytes},
+ Stats = {RamBytes, PCount, PBytes},
{Stats1, IndexState1} =
remove_queue_entries(Q4, Stats, IndexState, MSCState),
@@ -527,13 +529,13 @@ purge(State = #vqstate { q4 = Q4,
Stats1, State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
- {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} =
+ {{RamBytes3, PCount3, PBytes3}, IndexState3} =
remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
- bytes = Bytes3,
+ bytes = 0,
ram_msg_count = 0,
ram_bytes = RamBytes3,
persistent_count = PCount3,
@@ -561,7 +563,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
State3 = upd_bytes(
- 1, MsgStatus1,
+ 1, 0, MsgStatus1,
inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount1,
@@ -585,7 +587,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(1, MsgStatus,
+ State3 = upd_bytes(0, 1, MsgStatus,
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
@@ -843,8 +845,13 @@ info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
-info(message_bytes, #vqstate{bytes = Bytes}) ->
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
@@ -886,6 +893,7 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
bytes = Bytes,
+ unacked_bytes = UnackedBytes,
persistent_count = PersistentCount,
persistent_bytes = PersistentBytes,
ram_msg_count = RamMsgCount,
@@ -904,12 +912,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = Bytes >= 0,
+ true = UnackedBytes >= 0,
true = PersistentCount >= 0,
true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
true = RamBytes >= 0,
- true = RamBytes =< Bytes,
+ true = RamBytes =< Bytes + UnackedBytes,
State.
@@ -1100,6 +1109,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
ram_bytes = 0,
+ unacked_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1159,17 +1169,22 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
-upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) ->
- upd_bytes0(Sign, MsgStatus, State);
-upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) ->
- upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)).
+upd_bytes(SignReady, SignUnacked,
+ MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
+upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
-upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent},
+upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
State = #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes,
persistent_bytes = PBytes}) ->
- Diff = Sign * msg_size(MsgStatus),
- State#vqstate{bytes = Bytes + Diff,
- persistent_bytes = PBytes + one_if(IsPersistent) * Diff}.
+ S = msg_size(MsgStatus),
+ SignTotal = SignReady + SignUnacked,
+ State#vqstate{bytes = Bytes + SignReady * S,
+ unacked_bytes = UBytes + SignUnacked * S,
+ persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
@@ -1218,8 +1233,8 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
State2 = case AckRequired of
- false -> upd_bytes(-1, MsgStatus, State1);
- true -> State1
+ false -> upd_bytes(-1, 0, MsgStatus, State1);
+ true -> upd_bytes(-1, 1, MsgStatus, State1)
end,
{AckTag, maybe_update_rates(
State2 #vqstate {ram_msg_count = RamMsgCount1,
@@ -1243,16 +1258,15 @@ purge_betas_and_deltas(Stats,
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes},
+remove_queue_entries(Q, {RamBytes, PCount, PBytes},
IndexState, MSCState) ->
- {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} =
+ {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{Bytes1,
- RamBytes1,
+ {{RamBytes1,
PCount - case orddict:find(true, MsgIdsByStore) of
error -> 0;
{ok, Ids} -> length(Ids)
@@ -1266,12 +1280,11 @@ remove_queue_entries1(
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) ->
+ {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- Bytes - Size,
RamBytes - Size * one_if(Msg =/= undefined),
PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -1353,7 +1366,7 @@ remove_pending_ack(true, SeqId, State) ->
{MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
remove_pending_ack(false, SeqId, State),
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, upd_bytes(-1, MsgStatus,
+ {MsgStatus, upd_bytes(0, -1, MsgStatus,
State1 # vqstate{ persistent_count = PCount1 })};
remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
@@ -1487,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1501,7 +1514,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ upd_bytes(1, -1, MsgStatus, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2