diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-13 13:39:54 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-13 13:39:54 +0100 |
commit | 0dfbe573225e7e29e6acae7c2b8fe384e5ea41aa (patch) | |
tree | 6ea59fb00d050f732f2dd93d78f8b7aade33888d | |
parent | a1ef8d7f672ab940b99dfc406aa2bb3f33b992fd (diff) | |
parent | 2ed4c29c8bc278b817b82747d28fc86a46b80f3d (diff) | |
download | rabbitmq-server-0dfbe573225e7e29e6acae7c2b8fe384e5ea41aa.tar.gz |
Merge bug26337 (again)
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 8 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 64 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 30 | ||||
-rw-r--r-- | src/rabbit_policies.erl | 11 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 5 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 72 |
11 files changed, 125 insertions, 79 deletions
@@ -127,7 +127,7 @@ plugins: # Not building plugins check-xref: - $(info xref checks are disabled) + $(info xref checks are disabled as there is no plugins-src directory) endif diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index afc46e8e..eb3c7ef3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1210,6 +1210,14 @@ <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem> </varlistentry> <varlistentry> + <term>message_bytes_ready</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_unacknowledged</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem> + </varlistentry> + <varlistentry> <term>message_bytes_ram</term> <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem> </varlistentry> diff --git a/src/rabbit.erl b/src/rabbit.erl index 9ce9ec84..4b8af870 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -360,7 +360,7 @@ stop() -> undefined -> ok; _ -> await_startup(true) end, - rabbit_log:info("Stopping RabbitMQ~n"), + rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []), Apps = ?APPS ++ rabbit_plugins:active(), stop_apps(app_utils:app_dependency_order(Apps, true)). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e23dbd2..a33a8fcc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -461,7 +461,8 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba1517af..db297c1d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ dlx, dlx_routing_key, max_length, + max_bytes, args_policy_version, status }). @@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, - {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + {<<"max-length">>, fun res_min/2, fun init_max_length/2}, + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -304,6 +306,10 @@ init_max_length(MaxLen, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), State1. +init_max_bytes(MaxBytes, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), + State1. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% remains unchanged, or if the newly published message %% has no expiry and becomes the head of the queue then %% the call is unnecessary. - case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + case {Dropped, QLen =:= 1, Props#message_properties.expiry} of {false, false, _} -> State4; {true, true, undefined} -> State4; {_, _, _} -> drop_expired_msgs(State4) end end. -maybe_drop_head(State = #q{max_length = undefined}) -> - {0, State}; -maybe_drop_head(State = #q{max_length = MaxLen, - backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ:len(BQS) - MaxLen of - Excess when Excess > 0 -> - {Excess, - with_dlx( - State#q.dlx, - fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, - fun () -> - {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> - BQ:drop(false, BQS0) - end, {ok, BQS}, - lists:seq(1, Excess)), - State#q{backing_queue_state = BQS1} - end)}; - _ -> {0, State} +maybe_drop_head(State = #q{max_length = undefined, + max_bytes = undefined}) -> + {false, State}; +maybe_drop_head(State) -> + maybe_drop_head(false, State). + +maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case over_max_length(State) of + true -> + maybe_drop_head(true, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msg(X, State) end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end)); + false -> + {AlreadyDropped, State} end. +over_max_length(#q{max_length = MaxLen, + max_bytes = MaxBytes, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> WasEmpty = BQ:is_empty(BQS), @@ -726,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. -dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> +dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> {ok, State1} = dead_letter_msgs( fun (DLFun, Acc, BQS) -> - lists:foldl(fun (_, {ok, Acc0, BQS0}) -> - {{Msg, _, AckTag}, BQS1} = - BQ:fetch(true, BQS0), - {ok, DLFun(Msg, AckTag, Acc0), BQS1} - end, {ok, Acc, BQS}, lists:seq(1, Excess)) + {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS), + {ok, DLFun(Msg, AckTag, Acc), BQS1} end, maxlen, X, State), State1. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 595a05d3..098f5f43 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -20,8 +20,9 @@ -define(INFO_KEYS, [messages_ram, messages_ready_ram, messages_unacknowledged_ram, messages_persistent, - message_bytes, message_bytes_ram, message_bytes_persistent, - backing_queue_status]). + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e60388f0..a499686f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -133,15 +133,16 @@ init_from_config() -> end. auto_cluster(TryNodes, NodeType) -> - case find_good_node(nodes_excl_me(TryNodes)) of + case find_auto_cluster_node(nodes_excl_me(TryNodes)) of {ok, Node} -> rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster0(Node), init_db_and_upgrade(DiscNodes, NodeType, true), rabbit_node_monitor:notify_joined_cluster(); none -> - rabbit_log:warning("Could not find any node for auto-clustering " - "from: ~p~n", [TryNodes]), + rabbit_log:warning( + "Could not find any node for auto-clustering from: ~p~n" + "Starting blank node...~n", [TryNodes]), init_db_and_upgrade([node()], disc, false) end. @@ -792,17 +793,24 @@ is_virgin_node() -> false end. -find_good_node([]) -> +find_auto_cluster_node([]) -> none; -find_good_node([Node | Nodes]) -> +find_auto_cluster_node([Node | Nodes]) -> + Fail = fun (Fmt, Args) -> + rabbit_log:warning( + "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]), + find_auto_cluster_node(Nodes) + end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); + {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]), + Fail("~p~n~s~n", [Reason, Diag]); %% old delegate hash check - {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); - {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> Fail("versions ~p~n", + [{OTP, Rabbit}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 3558cf98..cc88765f 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -34,7 +34,8 @@ register() -> {policy_validator, <<"dead-letter-routing-key">>}, {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, - {policy_validator, <<"max-length">>}]], + {policy_validator, <<"max-length">>}, + {policy_validator, <<"max-length-bytes">>}]], ok. validate_policy(Terms) -> @@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length">>, Value) -> - {error, "~p is not a valid maximum length", [Value]}. - + {error, "~p is not a valid maximum length", [Value]}; +validate_policy0(<<"max-length-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-length-bytes">>, Value) -> + {error, "~p is not a valid maximum length in bytes", [Value]}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2cdd54a7..b2930f88 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -873,7 +873,7 @@ handle_method0(MethodName, FieldsBin, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) - catch throw:{inet_error, closed} -> + catch throw:{inet_error, E} when E =:= closed; E =:= enotconn -> maybe_emit_stats(State), throw(connection_closed_abruptly); exit:#amqp_error{method = none} = Reason -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index f169e13d..9f837cce 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -61,7 +61,7 @@ read(DirBaseName) -> end. clear() -> - dets:delete_all_objects(?MODULE), + ok = dets:delete_all_objects(?MODULE), flush(). start_link() -> gen_server:start_link(?MODULE, [], []). @@ -131,9 +131,8 @@ open_table() -> {ram_file, true}, {auto_save, infinity}]). -flush() -> dets:sync(?MODULE). +flush() -> ok = dets:sync(?MODULE). close_table() -> ok = flush(), ok = dets:close(?MODULE). - diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c52862ab..e97ed491 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -255,7 +255,8 @@ transient_threshold, len, %% w/o unacked - bytes, %% w unacked + bytes, %% w/o unacked + unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked @@ -347,6 +348,8 @@ len :: non_neg_integer(), bytes :: non_neg_integer(), + unacked_bytes :: non_neg_integer(), + persistent_count :: non_neg_integer(), persistent_bytes :: non_neg_integer(), @@ -508,14 +511,13 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - bytes = Bytes, ram_bytes = RamBytes, persistent_count = PCount, persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {Bytes, RamBytes, PCount, PBytes}, + Stats = {RamBytes, PCount, PBytes}, {Stats1, IndexState1} = remove_queue_entries(Q4, Stats, IndexState, MSCState), @@ -527,13 +529,13 @@ purge(State = #vqstate { q4 = Q4, Stats1, State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), - {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} = + {{RamBytes3, PCount3, PBytes3}, IndexState3} = remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, - bytes = Bytes3, + bytes = 0, ram_msg_count = 0, ram_bytes = RamBytes3, persistent_count = PCount3, @@ -561,7 +563,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), State3 = upd_bytes( - 1, MsgStatus1, + 1, 0, MsgStatus1, inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount1, @@ -585,7 +587,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes(1, MsgStatus, + State3 = upd_bytes(0, 1, MsgStatus, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -843,8 +845,13 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; -info(message_bytes, #vqstate{bytes = Bytes}) -> +info(message_bytes, #vqstate{bytes = Bytes, + unacked_bytes = UBytes}) -> + Bytes + UBytes; +info(message_bytes_ready, #vqstate{bytes = Bytes}) -> Bytes; +info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> + UBytes; info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> @@ -886,6 +893,7 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, bytes = Bytes, + unacked_bytes = UnackedBytes, persistent_count = PersistentCount, persistent_bytes = PersistentBytes, ram_msg_count = RamMsgCount, @@ -904,12 +912,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = Bytes >= 0, + true = UnackedBytes >= 0, true = PersistentCount >= 0, true = PersistentBytes >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, true = RamBytes >= 0, - true = RamBytes =< Bytes, + true = RamBytes =< Bytes + UnackedBytes, State. @@ -1100,6 +1109,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, ram_msg_count_prev = 0, ram_ack_count_prev = 0, ram_bytes = 0, + unacked_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1159,17 +1169,22 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. -upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) -> - upd_bytes0(Sign, MsgStatus, State); -upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) -> - upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)). +upd_bytes(SignReady, SignUnacked, + MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(SignReady, SignUnacked, MsgStatus, State); +upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(SignReady + SignUnacked, MsgStatus, + upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). -upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent}, +upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, State = #vqstate{bytes = Bytes, + unacked_bytes = UBytes, persistent_bytes = PBytes}) -> - Diff = Sign * msg_size(MsgStatus), - State#vqstate{bytes = Bytes + Diff, - persistent_bytes = PBytes + one_if(IsPersistent) * Diff}. + S = msg_size(MsgStatus), + SignTotal = SignReady + SignUnacked, + State#vqstate{bytes = Bytes + SignReady * S, + unacked_bytes = UBytes + SignUnacked * S, + persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. @@ -1218,8 +1233,8 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), State2 = case AckRequired of - false -> upd_bytes(-1, MsgStatus, State1); - true -> State1 + false -> upd_bytes(-1, 0, MsgStatus, State1); + true -> upd_bytes(-1, 1, MsgStatus, State1) end, {AckTag, maybe_update_rates( State2 #vqstate {ram_msg_count = RamMsgCount1, @@ -1243,16 +1258,15 @@ purge_betas_and_deltas(Stats, index_state = IndexState1 })) end. -remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes}, +remove_queue_entries(Q, {RamBytes, PCount, PBytes}, IndexState, MSCState) -> - {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} = + {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q), + {orddict:new(), RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{Bytes1, - RamBytes1, + {{RamBytes1, PCount - case orddict:find(true, MsgIdsByStore) of error -> 0; {ok, Ids} -> length(Ids) @@ -1266,12 +1280,11 @@ remove_queue_entries1( is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) -> + {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - Bytes - Size, RamBytes - Size * one_if(Msg =/= undefined), PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1353,7 +1366,7 @@ remove_pending_ack(true, SeqId, State) -> {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = remove_pending_ack(false, SeqId, State), PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), - {MsgStatus, upd_bytes(-1, MsgStatus, + {MsgStatus, upd_bytes(0, -1, MsgStatus, State1 # vqstate{ persistent_count = PCount1 })}; remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> @@ -1487,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1501,7 +1514,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + upd_bytes(1, -1, MsgStatus, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 |