From 7279a32f7a905a94283834bd8df4c357e0e900b7 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 3 Jul 2008 13:58:10 +0100 Subject: Migrate branch bug18732 --- src/rabbit_misc.erl | 11 ++++++++++- src/rabbit_reader.erl | 31 ++++++++++++++++++++++++------- src/rabbit_writer.erl | 10 +++++++--- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 927d7712..b71aba42 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -35,7 +35,7 @@ -export([r/3, r/2, rs/1]). -export([permission_list/1]). -export([enable_cover/0, report_cover/0]). --export([with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_realm/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -79,6 +79,8 @@ -spec(permission_list/1 :: (ticket()) -> [permission()]). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(throw_on_error/2 :: + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). @@ -231,6 +233,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + with_exit_handler(Handler, Thunk) -> try Thunk() diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1d11cbaa..38349a1c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -157,14 +157,27 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +peername(Sock) -> + try + {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + AddressS = inet_parse:ntoa(Address), + {AddressS, Port} + catch + Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Ex]), + rabbit_log:info("closing TCP connection ~p", [self()]), + exit(normal) + end. + start_connection(Parent, Deb, ClientSock) -> - ProfilingValue = setup_profiling(), process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {PeerAddressS, PeerPort} = peername(ClientSock), + ProfilingValue = setup_profiling(), try + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), mainloop(Parent, Deb, switch_callback( @@ -255,7 +268,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + Ref = inet_op(fun () -> prim_inet:async_recv( + OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -470,7 +484,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index eda871ec..c3c7db53 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -127,12 +127,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> Channel, Content, FrameMax), [MethodFrame | ContentFrames]. +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error(inet_error, + fun () -> gen_tcp:send(Sock, Data) end). + internal_send_command(Sock, Channel, MethodRecord) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from -- cgit v1.2.1 From f46e39fce6066a60d22dc91fed47a251cd3db7a1 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Thu, 4 Sep 2008 17:30:04 +0100 Subject: Added python-json as a dependency to the debian/rpm packages --- packaging/RPMS/Fedora/rabbitmq-server.spec | 1 + packaging/debs/Debian/debian/control | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 25213816..d29b2db8 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -8,6 +8,7 @@ Group: Development/Libraries Source: http://www.rabbitmq.com/releases/%{source_name}-%{main_version}.tar.gz URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. +BuildRequires: python, python-json Requires: erlang Packager: Hubert Plociniczak BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index df9a330b..abc6a658 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python +Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json Standards-Version: 3.7.2 Package: rabbitmq-server -- cgit v1.2.1 From ae317d9c2a90f5c78d8463e396c4dc06b48161eb Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 5 Sep 2008 12:34:07 +0100 Subject: Create non-native debian packages --- packaging/debs/Debian/Makefile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index dd74c31e..3e74cb52 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,5 +1,6 @@ TARBALL_DIR=../../../dist TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server @@ -16,7 +17,8 @@ all: package: clean make -C ../.. check_tools - tar -zxvf $(TARBALL_DIR)/$(TARBALL) + cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) + tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) -- cgit v1.2.1 From 57113b57f30914181c987fff25c80308dd98bc50 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 12 Sep 2008 12:24:22 +0100 Subject: Append rpm-specific files to the source tarball. Cleaned up the rpm Makefile. --- packaging/RPMS/Fedora/Makefile | 45 ++++++++++++++++++------------ packaging/RPMS/Fedora/rabbitmq-server.spec | 6 ++-- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 6cc3579b..6e1b06b9 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -6,30 +6,39 @@ TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) RPM_VERSION=$(shell echo $(VERSION) | tr - _) DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' +RPM_SOURCE_DIR=rabbitmq-server-$(VERSION)/rpm rpms: clean server #Create proper environment for making rpms prepare: - mkdir -p $(TOP_DIR)/BUILD - mkdir -p $(TOP_DIR)/SOURCES - mkdir -p $(TOP_DIR)/SPECS - mkdir -p $(TOP_DIR)/SRPMS - mkdir -p $(TOP_DIR)/RPMS - mkdir -p $(TOP_DIR)/tmp - cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES - cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS - cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD + mkdir -p BUILD + mkdir -p SOURCES + mkdir -p SPECS + mkdir -p SRPMS + mkdir -p RPMS + mkdir -p tmp + cp $(TOP_DIR)/$(TARBALL) SOURCES + cp rabbitmq-server.spec SPECS + + mkdir -p tmp/$(RPM_SOURCE_DIR) + cp init.d tmp/$(RPM_SOURCE_DIR) + cp rabbitmqctl_wrapper tmp/$(RPM_SOURCE_DIR) + cp rabbitmq-server.logrotate tmp/$(RPM_SOURCE_DIR) + + gunzip SOURCES/rabbitmq-server-$(VERSION).tar.gz + tar -C tmp -rf $(TOP_DIR)/SOURCES/rabbitmq-server-$(VERSION).tar \ + $(RPM_SOURCE_DIR)/ + gzip SOURCES/rabbitmq-server-$(VERSION).tar + rm -rf tmp/rabbitmq-server* server: prepare - rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch clean: - rm -rf $(TOP_DIR)/SOURCES/ - rm -rf $(TOP_DIR)/SPECS/ - rm -rf $(TOP_DIR)/RPMS/ - rm -rf $(TOP_DIR)/SRPMS/ - rm -rf $(TOP_DIR)/BUILD/ - rm -rf $(TOP_DIR)/tmp/ + rm -rf SOURCES + rm -rf SPECS + rm -rf RPMS + rm -rf SRPMS + rm -rf BUILD + rm -rf tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 08694c09..b06a88d6 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -51,17 +51,17 @@ mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. -cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server +cp rpm/init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real -cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl +cp rpm/rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz mkdir -p %{buildroot}/etc/logrotate.d -cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server +cp rpm/rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server %post # create rabbitmq group -- cgit v1.2.1 From 26586179ac092aadcf0366b82c6578118fd6d001 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Mon, 15 Sep 2008 09:55:28 +0100 Subject: Fixed checking build dependencies in rpms while building on Debian systems. Unfortunately .spec doesn't have 'not' logic. --- packaging/RPMS/Fedora/Makefile | 2 +- packaging/RPMS/Fedora/rabbitmq-server.spec | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c8e979a7..521fac43 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -5,7 +5,7 @@ SOURCE_TARBALL_DIR=../../../dist TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1' rpms: clean server diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index d29b2db8..6a57babd 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -8,7 +8,10 @@ Group: Development/Libraries Source: http://www.rabbitmq.com/releases/%{source_name}-%{main_version}.tar.gz URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. +%if 0%{?debian} +%else BuildRequires: python, python-json +%endif Requires: erlang Packager: Hubert Plociniczak BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root -- cgit v1.2.1 From cd0fef826c1d78c99f770c144572ded2f21be1be Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Mon, 15 Sep 2008 10:23:53 +0100 Subject: Cleaned up the Makefile --- packaging/RPMS/Fedora/Makefile | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 6e1b06b9..92c11c9c 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -12,12 +12,7 @@ rpms: clean server #Create proper environment for making rpms prepare: - mkdir -p BUILD - mkdir -p SOURCES - mkdir -p SPECS - mkdir -p SRPMS - mkdir -p RPMS - mkdir -p tmp + mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS @@ -27,8 +22,7 @@ prepare: cp rabbitmq-server.logrotate tmp/$(RPM_SOURCE_DIR) gunzip SOURCES/rabbitmq-server-$(VERSION).tar.gz - tar -C tmp -rf $(TOP_DIR)/SOURCES/rabbitmq-server-$(VERSION).tar \ - $(RPM_SOURCE_DIR)/ + (cd tmp; tar -rf ../SOURCES/rabbitmq-server-$(VERSION).tar $(RPM_SOURCE_DIR)/) gzip SOURCES/rabbitmq-server-$(VERSION).tar rm -rf tmp/rabbitmq-server* @@ -36,9 +30,4 @@ server: prepare rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch clean: - rm -rf SOURCES - rm -rf SPECS - rm -rf RPMS - rm -rf SRPMS - rm -rf BUILD - rm -rf tmp + rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp -- cgit v1.2.1 From 3c462730fe27e219e3b57ddecfeea95c35640e14 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Mon, 15 Sep 2008 12:55:07 +0100 Subject: Apply rpm-specific files as a patch, instead of directly changing the source tarball --- packaging/RPMS/Fedora/Makefile | 20 +++++++++++--------- packaging/RPMS/Fedora/rabbitmq-server.spec | 2 ++ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 92c11c9c..a1430432 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -2,11 +2,12 @@ VERSION=0.0.0 SOURCE_TARBALL_DIR=../../../dist -TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz +PACKAGE_VERSION_NAME=rabbitmq-server-$(VERSION) +TARBALL=$(SOURCE_TARBALL_DIR)/$(PACKAGE_VERSION_NAME).tar.gz TOP_DIR=$(shell pwd) RPM_VERSION=$(shell echo $(VERSION) | tr - _) DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' -RPM_SOURCE_DIR=rabbitmq-server-$(VERSION)/rpm +RPM_SOURCE_DIR=$(PACKAGE_VERSION_NAME)/rpm rpms: clean server @@ -15,16 +16,17 @@ prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - - mkdir -p tmp/$(RPM_SOURCE_DIR) + + rm -rf tmp/$(PACKAGE_VERSION_NAME)* + tar -zxf $(TOP_DIR)/$(TARBALL) -C tmp + cp -r tmp/$(PACKAGE_VERSION_NAME) tmp/$(PACKAGE_VERSION_NAME)-orig + + mkdir tmp/$(RPM_SOURCE_DIR) cp init.d tmp/$(RPM_SOURCE_DIR) cp rabbitmqctl_wrapper tmp/$(RPM_SOURCE_DIR) cp rabbitmq-server.logrotate tmp/$(RPM_SOURCE_DIR) - - gunzip SOURCES/rabbitmq-server-$(VERSION).tar.gz - (cd tmp; tar -rf ../SOURCES/rabbitmq-server-$(VERSION).tar $(RPM_SOURCE_DIR)/) - gzip SOURCES/rabbitmq-server-$(VERSION).tar - rm -rf tmp/rabbitmq-server* + - (cd tmp; diff -aurN $(PACKAGE_VERSION_NAME)-orig $(PACKAGE_VERSION_NAME) \ + > ../SOURCES/rpm.patch ) server: prepare rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b06a88d6..b642b1fe 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -8,6 +8,7 @@ URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. Requires: erlang, logrotate Packager: Hubert Plociniczak +Patch0: rpm.patch BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root Summary: The RabbitMQ server Requires(post): chkconfig @@ -34,6 +35,7 @@ fi %prep %setup -n %{name}-%{main_version} +%patch -p1 %build make -- cgit v1.2.1 From 6b67dfc96b6c9c79b1c95674854925603108fcb3 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sat, 4 Oct 2008 15:35:37 +0100 Subject: Implemented queue.unbind --- src/rabbit_channel.erl | 70 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5cc07aed..4cce60e5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, + #'queue.unbind_ok'{}, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -636,6 +625,41 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, State) -> + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, false, State). + +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between exhange ~s and queue ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {ok, _BindingCount} -> + return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, -- cgit v1.2.1 From 25380b49849b81e2c2f03ec4303c3414094b5db2 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sat, 4 Oct 2008 16:20:09 +0100 Subject: beginnings of reacting to low memory conditions configure memsup and hook in our own alarm handler --- scripts/rabbitmq-server | 3 +- scripts/rabbitmq-server.bat | 3 +- src/rabbit.erl | 3 ++ src/rabbit_alarm.erl | 82 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 src/rabbit_alarm.erl diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b930c8ed..994736ce 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -66,8 +66,9 @@ erl \ -sasl sasl_error_logger '{file,"'${SASL_LOGS}'"}' \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ - -os_mon start_memsup false \ + -os_mon start_memsup true \ -os_mon start_os_sup false \ + -os_mon memsup_system_only true \ -mnesia dir "\"${MNESIA_DIR}\"" \ ${CLUSTER_CONFIG} \ ${RABBIT_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index f08027d2..3ad093e0 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -107,8 +107,9 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -sasl sasl_error_logger {file,\""%LOG_BASE%/%NODENAME%-sasl.log"\"} ^ -os_mon start_cpu_sup true ^ -os_mon start_disksup false ^ --os_mon start_memsup false ^ +-os_mon start_memsup true ^ -os_mon start_os_sup false ^ +-os_mon memsup_system_only true ^ -mnesia dir \""%MNESIA_DIR%"\" ^ %CLUSTER_CONFIG% ^ %RABBIT_ARGS% ^ diff --git a/src/rabbit.erl b/src/rabbit.erl index c6ef1749..a33c5b7b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -157,6 +157,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), + ok = rabbit_alarm:start(), + ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -198,6 +200,7 @@ start(normal, []) -> stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), + ok = rabbit_alarm:stop(), ok. %--------------------------------------------------------------------------- diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl new file mode 100644 index 00000000..e71dda59 --- /dev/null +++ b/src/rabbit_alarm.erl @@ -0,0 +1,82 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_alarm). + +-behaviour(gen_event). + +-export([start/0, stop/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-define(MEMSUP_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + %% The default memsup check interval is 1 minute, which is way too + %% long - rabbit can gobble up all memory in a matter of + %% seconds. Unfortunately the memory_check_interval configuration + %% parameter and memsup:set_check_interval/1 function only provide + %% a granularity of minutes. So we have to peel off one layer of + %% the API to get to the underlying layer which operates at the + %% granularity of milliseconds. + ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, + infinity), + + ok = alarm_handler:add_alarm_handler(?MODULE). + +stop() -> + ok = alarm_handler:delete_alarm_handler(?MODULE). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, none}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event(Event, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -- cgit v1.2.1 From 9f13a84c4e2032753f7f0981973b4f3422a32784 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 13 Oct 2008 21:05:02 +0100 Subject: don't log errors during delivery If the target queue died normally we don't care, and if it died abnormally the reason is logged by the queue supervisor. In both cases we treat the message as unrouted. --- src/rabbit_router.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 41a8d64c..a2337647 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> fun (QPid, {Routed, Handled}) -> case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, Txn, Message, QPid) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n", - [QPid, Reason]), - {Routed, Handled} + true -> {true, [QPid | Handled]}; + false -> {true, Handled}; + {'EXIT', _Reason} -> {Routed, Handled} end end, {false, []}, -- cgit v1.2.1 From 6f68e9927c66c40b9699223b7d493f6492a95920 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 13 Oct 2008 21:37:02 +0100 Subject: exit with a proper protocol error when commit/rollback fail --- src/rabbit_channel.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9278898..ef3a9f0e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -717,7 +717,8 @@ internal_commit(State = #ch{transaction_id = TxnKey, case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of ok -> new_tx(State); - {error, Errors} -> exit({commit_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, @@ -732,7 +733,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> exit({rollback_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "rollback failed: ~w", [Errors]) end. fold_per_queue(F, Acc0, UAQ) -> -- cgit v1.2.1 From 6e45043ccfe77cb5313ac42ec128f9a648c5672a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 13 Oct 2008 21:56:26 +0100 Subject: produce a nicer error message for common queue disappearance case --- src/rabbit_amqqueue.erl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd64f1e4..7b2f801a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -295,25 +295,23 @@ ack(QPid, Txn, MsgIds, ChPid) -> commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( - fun (QPid) -> - rabbit_misc:with_exit_handler( - %% we don't care if the queue process has terminated - %% in the meantime - fun () -> ok end, - fun () -> gen_server:call(QPid, {notify_down, ChPid}, - Timeout) end) - end, + %% we don't care if the queue process has terminated in the + %% meantime + fun (_) -> ok end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). binding_forcibly_removed(BindingSpec, QueueName) -> @@ -388,10 +386,13 @@ pseudo_queue(QueueName, Pid) -> binding_specs = [], pid = Pid}. -safe_pmap_ok(F, L) -> +safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> - try F(V) + try + rabbit_misc:with_exit_handler( + fun () -> H(V) end, + fun () -> F(V) end) catch Class:Reason -> {Class, Reason} end end, L), @@ -399,4 +400,3 @@ safe_pmap_ok(F, L) -> [] -> ok; Errors -> {error, Errors} end. - -- cgit v1.2.1 From 38a6518f6e790ff48f3efb0ab64564154a69ba77 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 03:56:41 +0100 Subject: branch off for effect-less alarm handling so we can later experiment with different effects --- src/rabbit_alarm.erl | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index e71dda59..be0f0cea 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -34,6 +34,8 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). +-record(alarms, {system_memory_high_watermark = false}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -53,6 +55,11 @@ start() -> %% a granularity of minutes. So we have to peel off one layer of %% the API to get to the underlying layer which operates at the %% granularity of milliseconds. + %% + %% Note that the new setting will only take effect after the first + %% check has completed, i.e. after one minute. So if rabbit eats + %% all the memory within the first minute after startup then we + %% are out of luck. ok = os_mon:call(memsup, {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, infinity), @@ -64,12 +71,18 @@ stop() -> %%---------------------------------------------------------------------------- init([]) -> - {ok, none}. + {ok, #alarms{}}. handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event(Event, State) -> +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + {ok, State#alarms{system_memory_high_watermark = true}}; + +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + {ok, State#alarms{system_memory_high_watermark = false}}; + +handle_event(_Event, State) -> {ok, State}. handle_info(_Info, State) -> -- cgit v1.2.1 From 7957aefe8fe9f1aa99e84099ed1c954f3afbf1e8 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 04:00:11 +0100 Subject: cosmetic --- src/rabbit_alarm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index be0f0cea..752ac14b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -35,7 +35,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). -record(alarms, {system_memory_high_watermark = false}). - + %%---------------------------------------------------------------------------- -ifdef(use_specs). -- cgit v1.2.1 From eb13eb2bea603a0a1ba058f881c42052f323b163 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 05:36:51 +0100 Subject: make rabbit_alarm usable This now supports the registration of alertee processes with callback MFAs. We monitor the alertee process to keep the alertee list current, and notify alertees of initial high memory conditions, and any changes. --- src/rabbit_alarm.erl | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 752ac14b..346f5361 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -27,14 +27,14 @@ -behaviour(gen_event). --export([start/0, stop/0]). +-export([start/0, stop/0, register/2]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -define(MEMSUP_CHECK_INTERVAL, 1000). --record(alarms, {system_memory_high_watermark = false}). +-record(alarms, {alertees, system_memory_high_watermark = false}). %%---------------------------------------------------------------------------- @@ -42,7 +42,8 @@ -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). - +-spec(register/2 :: (pid(), mfa()) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -68,23 +69,44 @@ start() -> stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). +register(Pid, HighMemMFA) -> + ok = gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}). + %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{}}. - + {ok, #alarms{alertees = dict:new()}}. + +handle_call({register, Pid, HighMemMFA}, + State = #alarms{alertees = Alertess}) -> + _MRef = erlang:monitor(process, Pid), + case State#alarms.system_memory_high_watermark of + true -> {M, F, A} = HighMemMFA, + ok = erlang:apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertess), + {ok, ok, State#alarms{alertees = NewAlertees}}; + handle_call(_Request, State) -> {ok, not_understood, State}. handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + ok = alert(true, State#alarms.alertees), {ok, State#alarms{system_memory_high_watermark = true}}; handle_event({clear_alarm, system_memory_high_watermark}, State) -> + ok = alert(false, State#alarms.alertees), {ok, State#alarms{system_memory_high_watermark = false}}; handle_event(_Event, State) -> {ok, State}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #alarms{alertees = Alertess}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + handle_info(_Info, State) -> {ok, State}. @@ -93,3 +115,11 @@ terminate(_Arg, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%---------------------------------------------------------------------------- + +alert(Alert, Alertees) -> + dict:fold(fun (Pid, {M, F, A}, Acc) -> + ok = erlang:apply(M, F, A ++ [Pid, Alert]), + Acc + end, ok, Alertees). -- cgit v1.2.1 From b656422c2f7d2fdfdc35f97fd1c30c91203749c5 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 05:45:08 +0100 Subject: wire up channels to rabbit_alarm We propagate changes in the high memory alarm status as channel.flow messages to the client. The channel.flow_ok replies are simply accepted. --- src/rabbit_channel.erl | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5cc07aed..edfd1787 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -export([start_link/4, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4]). +-export([send_command/2, deliver/4, conserve_memory/2]). %% callbacks -export([init/2, handle_message/2]). @@ -49,6 +49,7 @@ -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> Pid ! {deliver, ConsumerTag, AckRequired, Msg}, ok. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + %%--------------------------------------------------------------------------- init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), #ch{state = starting, proxy_pid = ProxyPid, reader_pid = ReaderPid, @@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg}, true, ConsumerTag, DeliveryTag, Msg), State1#ch{next_tag = DeliveryTag + 1}; +handle_message({conserve_memory, Conserve}, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -630,6 +643,9 @@ handle_method(#'channel.flow'{active = _}, _, State) -> %% FIXME: implement {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow_ok'{active = _}, _, State) -> + {noreply, State}; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). -- cgit v1.2.1 From 0d755f43aa1fdc6fe1072a8bb7e46aabc32f6e4d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 05:46:04 +0100 Subject: some explanation --- src/rabbit_channel.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index edfd1787..e3542b00 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -644,6 +644,9 @@ handle_method(#'channel.flow'{active = _}, _, State) -> {reply, #'channel.flow_ok'{active = true}, State}; handle_method(#'channel.flow_ok'{active = _}, _, State) -> + %% TODO: We may want to correlate this to channel.flow messages we + %% have sent, and complain if we get an unsolicited + %% channel.flow_ok, or the client refuses our flow request. {noreply, State}; handle_method(_MethodRecord, _Content, _State) -> -- cgit v1.2.1 From 8f25f029db6fe8a2d498db4c91295f8a0aa8e78f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 16 Oct 2008 06:11:33 +0100 Subject: fix 'duplicate_next' error when sending messages directly to proxied process The buffering_proxy:mainloop was unconditionally requesting new messages from the proxy. It should only do that when it has just finished handling the messages given to it by the proxy in response to a previous request, and not after handling a direct message. --- src/buffering_proxy.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index d2505701..dc168608 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -40,7 +40,8 @@ start_link(M, A) -> ProxyPid = self(), Ref = make_ref(), Pid = spawn_link( - fun () -> mainloop(ProxyPid, Ref, M, + fun () -> ProxyPid ! Ref, + mainloop(ProxyPid, Ref, M, M:init(ProxyPid, A)) end), proxy_loop(Ref, Pid, empty) end). @@ -48,13 +49,15 @@ start_link(M, A) -> %%---------------------------------------------------------------------------- mainloop(ProxyPid, Ref, M, State) -> - ProxyPid ! Ref, NewState = receive {Ref, Messages} -> - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)); + NewSt = + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)), + ProxyPid ! Ref, + NewSt; Msg -> M:handle_message(Msg, State) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). -- cgit v1.2.1 From 9b70165c57cf373592e34964917a78f91a8b148c Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 16 Oct 2008 14:02:06 +0100 Subject: Fixed QA remarks --- src/rabbit_channel.erl | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0bad01f8..7cec5f86 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -582,8 +582,8 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, - #'queue.unbind_ok'{}, State); + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -625,11 +625,6 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, State) -> - binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, false, State). - binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> %% FIXME: connection exception (!) on failure?? @@ -649,7 +644,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( - not_found, "no binding ~s between exhange ~s and queue ~s", + not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); {error, durability_settings_incompatible} -> -- cgit v1.2.1 From c3feec7be0240ebe89137d9db66fd17fedd40a75 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Thu, 16 Oct 2008 18:33:44 +0100 Subject: Spec no longer requires non-standard macros, include rpm-specifc files as sources instead of patch --- packaging/RPMS/Fedora/Makefile | 17 +++++------------ packaging/RPMS/Fedora/rabbitmq-server.spec | 30 ++++++++++++++---------------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index a1430432..4824b629 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -5,8 +5,7 @@ SOURCE_TARBALL_DIR=../../../dist PACKAGE_VERSION_NAME=rabbitmq-server-$(VERSION) TARBALL=$(SOURCE_TARBALL_DIR)/$(PACKAGE_VERSION_NAME).tar.gz TOP_DIR=$(shell pwd) -RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' RPM_SOURCE_DIR=$(PACKAGE_VERSION_NAME)/rpm rpms: clean server @@ -16,17 +15,11 @@ prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS + sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec - rm -rf tmp/$(PACKAGE_VERSION_NAME)* - tar -zxf $(TOP_DIR)/$(TARBALL) -C tmp - cp -r tmp/$(PACKAGE_VERSION_NAME) tmp/$(PACKAGE_VERSION_NAME)-orig - - mkdir tmp/$(RPM_SOURCE_DIR) - cp init.d tmp/$(RPM_SOURCE_DIR) - cp rabbitmqctl_wrapper tmp/$(RPM_SOURCE_DIR) - cp rabbitmq-server.logrotate tmp/$(RPM_SOURCE_DIR) - - (cd tmp; diff -aurN $(PACKAGE_VERSION_NAME)-orig $(PACKAGE_VERSION_NAME) \ - > ../SOURCES/rpm.patch ) + cp init.d SOURCES/rabbitmq-server.init + cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper + cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b642b1fe..d4e21ebe 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,15 +1,17 @@ Name: rabbitmq-server -Version: %{rpm_version} +Version: %%VERSION%% Release: 1 License: MPLv1.1 Group: Development/Libraries -Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz +Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz +Source1: rabbitmq-server.init +Source2: rabbitmq-server.wrapper +Source3: rabbitmq-server.logrotate URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. Requires: erlang, logrotate Packager: Hubert Plociniczak -Patch0: rpm.patch -BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root Summary: The RabbitMQ server Requires(post): chkconfig Requires(pre): chkconfig initscripts @@ -20,10 +22,8 @@ performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _mandir /usr/share/man -%define _sbindir /usr/sbin -%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") -%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version} +%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") +%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} %pre @@ -34,8 +34,7 @@ if [ $1 -gt 1 ]; then fi %prep -%setup -n %{name}-%{main_version} -%patch -p1 +%setup -n %{name}-%{version} %build make @@ -46,24 +45,23 @@ rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ SBIN_DIR=%{buildroot}%{_sbindir} \ MAN_DIR=%{buildroot}%{_mandir} - VERSION=%{main_version} + VERSION=%{version} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. -cp rpm/init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real -cp rpm/rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl -chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl +install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz mkdir -p %{buildroot}/etc/logrotate.d -cp rpm/rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server +install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server %post # create rabbitmq group @@ -95,7 +93,7 @@ fi %files %defattr(-,root,root,-) -%{_libdir}/rabbitmq_server-%{main_version}/ +%{_erllibdir}/rabbitmq_server-%{version}/ %{_mandir}/man1/rabbitmq-multi.1.gz %{_mandir}/man1/rabbitmq-server.1.gz %{_mandir}/man1/rabbitmqctl.1.gz -- cgit v1.2.1 From dcaa92abe9f710027281359d0ffbebc7c15ab0e9 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Thu, 16 Oct 2008 18:42:00 +0100 Subject: Removed unused variables in the Makefile --- packaging/RPMS/Fedora/Makefile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 4824b629..d5171445 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -2,11 +2,9 @@ VERSION=0.0.0 SOURCE_TARBALL_DIR=../../../dist -PACKAGE_VERSION_NAME=rabbitmq-server-$(VERSION) -TARBALL=$(SOURCE_TARBALL_DIR)/$(PACKAGE_VERSION_NAME).tar.gz +TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' -RPM_SOURCE_DIR=$(PACKAGE_VERSION_NAME)/rpm rpms: clean server -- cgit v1.2.1 From 7a9059d594455132f473fd57b1dcc328379aacef Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Sun, 19 Oct 2008 02:13:47 +0100 Subject: Added missing build dependencies for debian --- packaging/debs/Debian/debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 675e15f4..c2045b91 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json +Build-Depends: cdbs, debhelper (>= 5), elinks, erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, findutils, gzip, make, python, python-json, reprepro, sed, tar, zip Standards-Version: 3.7.2 Package: rabbitmq-server -- cgit v1.2.1 From 863f63c35b242beb4703faa3ddd832c33c896076 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Sun, 19 Oct 2008 02:19:38 +0100 Subject: Forgot about perl --- packaging/debs/Debian/debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index c2045b91..e2f41789 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones -Build-Depends: cdbs, debhelper (>= 5), elinks, erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, findutils, gzip, make, python, python-json, reprepro, sed, tar, zip +Build-Depends: cdbs, debhelper (>= 5), elinks, erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, findutils, gzip, make, perl, python, python-json, reprepro, sed, tar, zip Standards-Version: 3.7.2 Package: rabbitmq-server -- cgit v1.2.1 From f78b288923dcbaefa12c7100a9a4adcfe1fd0c3d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 21 Oct 2008 15:46:05 +0100 Subject: raise memory alarm threshold The default 80% is just too low for many systems - I have less than that on tanto most of the time. It remains to be seen whether the new figure works ok for most users. --- scripts/rabbitmq-server | 1 + scripts/rabbitmq-server.bat | 1 + 2 files changed, 2 insertions(+) diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 994736ce..c953a753 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -69,6 +69,7 @@ erl \ -os_mon start_memsup true \ -os_mon start_os_sup false \ -os_mon memsup_system_only true \ + -os_mon system_memory_high_watermark 0.95 \ -mnesia dir "\"${MNESIA_DIR}\"" \ ${CLUSTER_CONFIG} \ ${RABBIT_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 3ad093e0..38b8cc53 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -110,6 +110,7 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -os_mon start_memsup true ^ -os_mon start_os_sup false ^ -os_mon memsup_system_only true ^ +-os_mon system_memory_high_watermark 0.95 ^ -mnesia dir \""%MNESIA_DIR%"\" ^ %CLUSTER_CONFIG% ^ %RABBIT_ARGS% ^ -- cgit v1.2.1 From 324972e9e3658d6631358f0906c3d72afe470ae2 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 21 Oct 2008 15:59:41 +0100 Subject: hibernate some processes to conserve memory In my experiments I encountered situations where rabbit would not recover from a high memory alert even though all messages had been drained from it. By inspecting the running processes I determined that queue and channel processes sometimes hung on to garbage. Erlang's gc is per-process and triggered by process reduction counts, which means an idle process will never perform a gc. This explains the behaviour - the publisher channel goes idle when channel flow control is activated and the queue process goes idle once all messages have been drained from it. Hibernating idle processes forces a gc, as well as generally reducing memory consumption. Currently only channel and queue processes are hibernating, since these are the only two that seemed to be causing problems in my tests. We may want to extend hibernation to other processes in the future. --- src/buffering_proxy.erl | 5 +++ src/rabbit_amqqueue_process.erl | 78 +++++++++++++++++++++++------------------ 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index dc168608..7707e636 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -32,6 +32,8 @@ -export([mainloop/4, drain/2]). -export([proxy_loop/3]). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- start_link(M, A) -> @@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) -> ProxyPid ! Ref, NewSt; Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef16..e687df84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -30,6 +30,7 @@ -behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER, 1000). -export([start_link/1]). @@ -75,7 +76,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) -> handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), @@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, end, round_robin = NewActive})) of {continue, NewState} -> - {noreply, NewState}; + noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end @@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% queues discarding the message? %% {Delivered, NewState} = attempt_delivery(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), @@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) -> gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), - {noreply, NewState}; + noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, persist_auto_ack(QName, Message) end, Msg = {QName, self(), NextId, Delivered, Message}, - {reply, {ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + reply({ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, + next_msg_id = NextId + 1}); {empty, _} -> - {reply, empty, State} + reply(empty, State) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, @@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> - {reply, {error, queue_owned_by_another_connection}, State}; + reply({error, queue_owned_by_another_connection}, State); ok -> case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> - {reply, {error, exclusive_consume_unavailable}, State}; + reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, @@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, run_poke_burst(State1)} + reply(ok, run_poke_burst(State1)) end end; @@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, State}; + reply(ok, State); C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, @@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ConsumerTag, RoundRobin)}) of {continue, State1} -> - {reply, ok, State1}; + reply(ok, State1); {stop, State1} -> {stop, normal, ok, State1} end @@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, round_robin = RoundRobin}) -> - {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> - {reply, {error, not_empty}, State}; + reply({error, not_empty}, State); IfUnused and not(IsUnused) -> - {reply, {error, in_use}, State}; + reply({error, in_use}, State); true -> {stop, normal, {ok, queue:len(MessageBuffer)}, State} end; handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), - {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + reply({ok, queue:len(MessageBuffer)}, + State#q{message_buffer = queue:new()}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% to check, we'd need to hold not just the ch %% pid for each consumer, but also its reader %% pid... - {reply, locked, State}; + reply(locked, State); ok -> - {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) end; {ReaderPid, _MonitorRef} -> - {reply, ok, State}; + reply(ok, State); _ -> - {reply, locked, State} + reply(locked, State) end. handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {noreply, NewState}; + noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), @@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> _ -> record_pending_acks(Txn, ChPid, MsgIds) end, - {noreply, State} + noreply(State) end; handle_cast({rollback, Txn}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), - {noreply, State}; + noreply(State); handle_cast({redeliver, Messages}, State) -> - {noreply, deliver_or_enqueue_n(Messages, State)}; + noreply(deliver_or_enqueue_n(Messages, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {noreply, deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)} + noreply(deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) end; handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); T = #cr{unsent_message_count =Count} -> - {noreply, possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)} + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State) -> + {noreply, State, hibernate}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -- cgit v1.2.1 From 271b083cc309bc300448ca1e2d5aa7fca8a8497f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 24 Oct 2008 17:46:50 +0100 Subject: hibernate buffering_proxies My tests got stuck after about an hour, and the cause was buffering_proxies holding on to memory --- src/buffering_proxy.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index 7707e636..fcb7b412 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -97,4 +97,6 @@ proxy_loop(Ref, Pid, State) -> waiting -> Pid ! {Ref, [Msg]}, empty; Messages -> [Msg | Messages] end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) end. -- cgit v1.2.1 From a8604f3c73e122275bd3b1ceb56d1326d71842dd Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 24 Oct 2008 18:32:55 +0100 Subject: Minimal version of build dependencies on debian --- packaging/debs/Debian/debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index e2f41789..70f67b00 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones -Build-Depends: cdbs, debhelper (>= 5), elinks, erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, findutils, gzip, make, perl, python, python-json, reprepro, sed, tar, zip +Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, python, python-json Standards-Version: 3.7.2 Package: rabbitmq-server -- cgit v1.2.1 From 46f9be1cf381d93f335b9841bd9fac24048b817a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sat, 25 Oct 2008 06:13:06 +0100 Subject: hibernate writers With this in place I am finally unable to make rabbit grind to a halt due to garbage being held by idle processes. --- src/rabbit_writer.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0f6bca91..dee24cd9 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -36,6 +36,8 @@ -record(wstate, {sock, channel, frame_max}). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -63,6 +65,8 @@ start(Sock, Channel, FrameMax) -> mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, [State]) end. handle_message({send_command, MethodRecord}, -- cgit v1.2.1 From 9574295bd6890d2f404ad9529be66d0a87e46ec3 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Sat, 25 Oct 2008 11:09:24 +0100 Subject: Removed packages that are already set as dependencies of the others --- packaging/debs/Debian/debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 70f67b00..749791a4 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, python, python-json +Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json Standards-Version: 3.7.2 Package: rabbitmq-server -- cgit v1.2.1 From e8e77a2351232e16e7cfe409f234c52e77be5d8b Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Sat, 25 Oct 2008 13:05:55 +0100 Subject: Added new line to logrotate scripts, debian packaging no longer throws warning --- packaging/RPMS/Fedora/rabbitmq-server.logrotate | 2 +- packaging/debs/Debian/debian/rabbitmq-server.logrotate | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate index 64cd01a1..ab87e4a5 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate +++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /sbin/service rabbitmq-server rotate-logs endscript -} \ No newline at end of file +} diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate index 247635d1..bfd6b8da 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate +++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /etc/init.d/rabbitmq-server rotate-logs endscript -} \ No newline at end of file +} -- cgit v1.2.1