diff options
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 25 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | src/gm.erl | 4 | ||||
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_intermediate_sup.erl | 39 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 36 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 94 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 22 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 3 | ||||
-rw-r--r-- | version.mk | 1 |
17 files changed, 178 insertions, 120 deletions
@@ -56,7 +56,8 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) -VERSION?=0.0.0 +include version.mk + PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) PLUGINS_DIR=plugins TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -262,6 +263,8 @@ srcdist: distclean cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) + echo "VERSION?=${VERSION}" > $(TARGET_SRC_DIR)/version.mk + cp -r scripts $(TARGET_SRC_DIR) cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 0f3c0faf..1d641144 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -405,6 +405,13 @@ must be offline, while the node we are removing from must be online, except when using the <command>--offline</command> flag. </para> + <para> + When using the <command>--offline</command> flag the node you + connect to will become the canonical source for cluster metadata + (e.g. which queues exist), even if it was not before. Therefore + you should use this command on the latest node to shut down if + at all possible. + </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen> <para role="example"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 339fa69e..635869a2 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -14,8 +14,7 @@ %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, - {env, [{hipe_compile, false}, - {tcp_listeners, [5672]}, + {env, [{tcp_listeners, [5672]}, {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, @@ -51,5 +50,23 @@ {backlog, 128}, {nodelay, true}, {linger, {true, 0}}, - {exit_on_close, false}]} - ]}]}. + {exit_on_close, false}]}, + {hipe_compile, false}, + %% see bug 24513 for how this list was created + {hipe_modules, + [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange, + rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic, + rabbit_event, lists, queue, priority_queue, rabbit_router, + rabbit_trace, rabbit_misc, rabbit_binary_parser, + rabbit_exchange_type_direct, rabbit_guid, rabbit_net, + rabbit_amqqueue_process, rabbit_variable_queue, + rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, + sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, + rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets, + file_handle_cache, rabbit_msg_store, array, + rabbit_msg_store_ets_index, rabbit_msg_file, + rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, + pmon, ssl_connection, tls_connection, ssl_record, tls_record, + gen_fsm, ssl]} + ]}]}. diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 2e6040ca..028b4ec2 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -123,6 +123,9 @@ done rm -rf %{buildroot} %changelog +* Tue Jun 25 2013 tim@rabbitmq.com 3.1.3-1 +- New Upstream Release + * Mon Jun 24 2013 tim@rabbitmq.com 3.1.2-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 404ae616..fda29e7d 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.1.3-1) unstable; urgency=low + + * New Upstream Release + + -- Tim Watson <tim@rabbitmq.com> Tue, 25 Jun 2013 15:01:12 +0100 + rabbitmq-server (3.1.2-1) unstable; urgency=low * New Upstream Release @@ -1053,7 +1053,7 @@ prune_or_create_group(Self, GroupName, TxnFun) -> fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = ?VERSION_START }, + version = get_version(Self) }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1294,6 +1294,8 @@ remove_erased_members(MembersState, View) -> MembersState1) end, blank_member_state(), all_known_members(View)). +get_version({Version, _Pid}) -> Version. + get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. diff --git a/src/rabbit.erl b/src/rabbit.erl index b3fb98af..dddf6f47 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,22 +192,6 @@ -define(APPS, [os_mon, mnesia, rabbit]). -%% see bug 24513 for how this list was created --define(HIPE_WORTHY, - [rabbit_reader, rabbit_channel, gen_server2, - rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1, - rabbit_basic, rabbit_event, lists, queue, priority_queue, - rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser, - rabbit_exchange_type_direct, rabbit_guid, rabbit_net, - rabbit_amqqueue_process, rabbit_variable_queue, - rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, - sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, - rabbit_queue_index, gen, dict, ordsets, file_handle_cache, - rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, - rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, - ssl_connection, ssl_record, gen_fsm, ssl]). - %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In %% practice 2 processes seems just as fast as any other number > 1, @@ -281,7 +265,9 @@ warn_if_hipe_compilation_failed(false) -> %% long time, so make an exception to our no-stdout policy and display %% progress via stdout. hipe_compile() -> - Count = length(?HIPE_WORTHY), + {ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules), + HipeModules = [HM || HM <- HipeModulesAll, code:which(HM) =/= non_existing], + Count = length(HipeModules), io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), @@ -290,7 +276,7 @@ hipe_compile() -> io:format("#") end || M <- Ms] end) || - Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)], + Ms <- split(HipeModules, ?HIPE_PROCESSES)], [receive {'DOWN', MRef, process, _, normal} -> ok; {'DOWN', MRef, process, _, Reason} -> exit(Reason) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c5045609..b30af033 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -540,6 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {false, BQS1} -> deliver_msgs_to_consumers( fun (true, State1 = #q{backing_queue_state = BQS2}) -> + true = BQ:is_empty(BQS2), {AckTag, BQS3} = BQ:publish_delivered( Message, Props, SenderPid, BQS2), {{Message, Delivered, AckTag}, @@ -548,10 +549,8 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {{Message, Delivered, undefined}, true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); - {published, BQS1} -> - {true, State#q{backing_queue_state = BQS1}}; - {discarded, BQS1} -> - {false, State#q{backing_queue_state = BQS1}} + {true, BQS1} -> + {true, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2f247448..bf26cb5a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -90,10 +90,7 @@ -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. +%% queue, but are not going to be further passed to BQ. -callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last @@ -216,11 +213,10 @@ -callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). %% Called prior to a publish or publish_delivered call. Allows the BQ -%% to signal that it's already seen this message (and in what capacity -%% - i.e. was it published previously or discarded previously) and -%% thus the message should be dropped. +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) and thus the message should be dropped. -callback is_duplicate(rabbit_types:basic_message(), state()) - -> {'false'|'published'|'discarded', state()}. + -> {boolean(), state()}. -else. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 31bc51b8..fedfe97a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,11 +42,20 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + %% We need to get channels in the hierarchy here so they close + %% before the reader. But for 1.0 readers we can't start the real + %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - + %% so we add another supervisor into the hierarchy. + {ok, ChannelSup3Pid} = + supervisor2:start_child( + SupPid, + {channel_sup3, {rabbit_intermediate_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [SupPid, Collector, + [ChannelSup3Pid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl new file mode 100644 index 00000000..1919d9d6 --- /dev/null +++ b/src/rabbit_intermediate_sup.erl @@ -0,0 +1,39 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_intermediate_sup). + +-behaviour(supervisor2). + +-export([start_link/0]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bcd4861a..572cd0ca 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - BQS1 = BQ:discard(MsgId, ChPid, BQS), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQS1, - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + ensure_monitoring(ChPid, State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> @@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {published, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, confirmed} -> + {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, Disposition} + when Disposition =:= confirmed %% It got published when we were a slave via gm, and %% confirmed some time after that (maybe even after %% promotion), but before we received the publish from the @@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately after calling is_duplicate/2. - {published, State #state { seen_status = dict:erase(MsgId, SS), - confirmed = [MsgId | Confirmed] }}; - {ok, discarded} -> - %% Don't erase from SS here because discard/2 is about to - %% be called and we need to be able to detect this case - {discarded, State} + orelse Disposition =:= discarded -> + %% Message was discarded while we were a slave. Confirm now. + %% As above, amqqueue_process will have the entry for the + %% msg_id_to_channel mapping. + {true, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 88b0f005..23deb054 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -724,6 +724,7 @@ process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), + true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8cd976fa..d282dad0 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -56,7 +56,8 @@ %% Main interface -spec(init/0 :: () -> 'ok'). --spec(join_cluster/2 :: (node(), node_type()) -> 'ok'). +-spec(join_cluster/2 :: (node(), node_type()) + -> 'ok' | {'ok', 'already_member'}). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). @@ -164,23 +165,24 @@ join_cluster(DiscoveryNode, NodeType) -> {error, _} = E -> throw(E) end, case me_in_nodes(ClusterNodes) of - true -> e(already_clustered); - false -> ok - end, - - %% reset the node. this simplifies things and it will be needed in - %% this case - we're joining a new cluster with new nodes which - %% are not in synch with the current node. I also lifts the burden - %% of reseting the node from the user. - reset_gracefully(), - - %% Join the cluster - rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), - ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), - rabbit_node_monitor:notify_joined_cluster(), - - ok. + false -> + %% reset the node. this simplifies things and it will be needed in + %% this case - we're joining a new cluster with new nodes which + %% are not in synch with the current node. I also lifts the burden + %% of reseting the node from the user. + reset_gracefully(), + + %% Join the cluster + rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), + ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), + rabbit_node_monitor:notify_joined_cluster(), + ok; + true -> + rabbit_misc:local_info_msg("Already member of cluster: ~p~n", + [ClusterNodes]), + {ok, already_member} + end. %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no @@ -294,27 +296,18 @@ remove_node_offline_node(Node) -> %% this operation from disc nodes. case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of {[], disc} -> - %% Note that while we check if the nodes was the last to go down, - %% apart from the node we're removing from, this is still unsafe. - %% Consider the situation in which A and B are clustered. A goes - %% down, and records B as the running node. Then B gets clustered - %% with C, C goes down and B goes down. In this case, C is the - %% second-to-last, but we don't know that and we'll remove B from A - %% anyway, even if that will lead to bad things. - case cluster_nodes(running) -- [node(), Node] of - [] -> start_mnesia(), - try - %% What we want to do here is replace the last node to - %% go down with the current node. The way we do this - %% is by force loading the table, and making sure that - %% they are loaded. - rabbit_table:force_load(), - rabbit_table:wait_for_replicated(), - forget_cluster_node(Node, false) - after - stop_mnesia() - end; - _ -> e(not_last_node_to_go_down) + start_mnesia(), + try + %% What we want to do here is replace the last node to + %% go down with the current node. The way we do this + %% is by force loading the table, and making sure that + %% they are loaded. + rabbit_table:force_load(), + rabbit_table:wait_for_replicated(), + forget_cluster_node(Node, false), + force_load_next_boot() + after + stop_mnesia() end; {_, _} -> e(removing_node_from_offline_node) @@ -439,11 +432,13 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> ok = create_schema(); {[], true, disc} -> %% First disc node up + maybe_force_load(), ok; {[AnotherNode | _], _, _} -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), + maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) end, @@ -523,6 +518,19 @@ copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_file:recursive_copy(dir(), Destination). +force_load_filename() -> + filename:join(rabbit_mnesia:dir(), "force_load"). + +force_load_next_boot() -> + rabbit_file:write_file(force_load_filename(), <<"">>). + +maybe_force_load() -> + case rabbit_file:is_file(force_load_filename()) of + true -> rabbit_table:force_load(), + rabbit_file:delete(force_load_filename()); + false -> ok + end. + %% This does not guarantee us much, but it avoids some situations that %% will definitely end up badly check_cluster_consistency() -> @@ -853,10 +861,6 @@ error_description(clustering_only_disc_node) -> error_description(resetting_only_disc_node) -> "You cannot reset a node when it is the only disc node in a cluster. " "Please convert another node of the cluster to a disc node first."; -error_description(already_clustered) -> - "You are already clustered with the nodes you have selected. If the " - "node you are trying to cluster with is not present in the current " - "node status, use 'update_cluster_nodes'."; error_description(not_clustered) -> "Non-clustered nodes can only be disc nodes."; error_description(cannot_connect_to_cluster) -> @@ -879,10 +883,6 @@ error_description(offline_node_no_offline_flag) -> "You are trying to remove a node from an offline node. That is dangerous, " "but can be done with the --offline flag. Please consult the manual " "for rabbitmqctl for more information."; -error_description(not_last_node_to_go_down) -> - "The node you are trying to remove from was not the last to go down " - "(excluding the node you are removing). Please use the the last node " - "to go down to remove nodes when the cluster is offline."; error_description(removing_node_from_offline_node) -> "To remove a node remotely from an offline node, the node you are removing " "from must be a disc node and all the other nodes must be offline."; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 61fac0e2..3cf88d06 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -103,19 +103,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, +start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -201,7 +201,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -240,7 +240,7 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], @@ -837,7 +837,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -847,7 +847,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Throttle1 = Throttle#throttle{conserve_resources = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( - ConnSupPid, + ChSup3Pid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( @@ -1048,9 +1048,9 @@ pack_for_1_0(#v1{parent = Parent, recv_len = RecvLen, pending_recv = PendingRecv, queue_collector = QueueCollector, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, Buf, BufLen}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5b39c2c6..73ab64bf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -546,8 +546,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { len = 0, - next_seq_id = SeqId, + _ChPid, State = #vqstate { next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, diff --git a/version.mk b/version.mk new file mode 100644 index 00000000..5683af4a --- /dev/null +++ b/version.mk @@ -0,0 +1 @@ +VERSION?=0.0.0 |